summaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'configserver/src/main/java/com')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java21
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java2
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java49
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java20
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java23
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java10
7 files changed, 100 insertions, 30 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 47d1193cd4c..068323f7784 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -197,7 +197,6 @@ public class ModelContextImpl implements ModelContext {
private final boolean useV8GeoPositions;
private final int maxCompactBuffers;
private final List<String> ignoredHttpUserAgents;
- private final boolean enableServerOcspStapling;
private final String mergeThrottlingPolicy;
private final double persistenceThrottlingWsDecrementFactor;
private final double persistenceThrottlingWsBackoff;
@@ -215,6 +214,12 @@ public class ModelContextImpl implements ModelContext {
private final boolean mbus_dispatch_on_encode;
private final int mbus_threads;
private final int mbus_network_threads;
+ private int mbus_java_num_targets;
+ private int mbus_java_events_before_wakeup;
+ private int mbus_cpp_num_targets;
+ private int mbus_cpp_events_before_wakeup;
+ private int rpc_num_targets;
+ private int rpc_events_before_wakeup;
public FeatureFlags(FlagSource source, ApplicationId appId, Version version) {
this.defaultTermwiseLimit = flagValue(source, appId, version, Flags.DEFAULT_TERM_WISE_LIMIT);
@@ -252,7 +257,6 @@ public class ModelContextImpl implements ModelContext {
this.useV8GeoPositions = flagValue(source, appId, version, Flags.USE_V8_GEO_POSITIONS);
this.maxCompactBuffers = flagValue(source, appId, version, Flags.MAX_COMPACT_BUFFERS);
this.ignoredHttpUserAgents = flagValue(source, appId, version, PermanentFlags.IGNORED_HTTP_USER_AGENTS);
- this.enableServerOcspStapling = flagValue(source, appId, version, Flags.ENABLE_SERVER_OCSP_STAPLING);
this.mergeThrottlingPolicy = flagValue(source, appId, version, Flags.MERGE_THROTTLING_POLICY);
this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR);
this.persistenceThrottlingWsBackoff = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF);
@@ -266,6 +270,12 @@ public class ModelContextImpl implements ModelContext {
this.enableProxyProtocolMixedMode = flagValue(source, appId, version, Flags.ENABLE_PROXY_PROTOCOL_MIXED_MODE);
this.sharedStringRepoNoReclaim = flagValue(source, appId, version, Flags.SHARED_STRING_REPO_NO_RECLAIM);
this.logFileCompressionAlgorithm = flagValue(source, appId, version, Flags.LOG_FILE_COMPRESSION_ALGORITHM);
+ this.mbus_java_num_targets = flagValue(source, appId, version, Flags.MBUS_JAVA_NUM_TARGETS);
+ this.mbus_java_events_before_wakeup = flagValue(source, appId, version, Flags.MBUS_JAVA_EVENTS_BEFORE_WAKEUP);
+ this.mbus_cpp_num_targets = flagValue(source, appId, version, Flags.MBUS_CPP_NUM_TARGETS);
+ this.mbus_cpp_events_before_wakeup = flagValue(source, appId, version, Flags.MBUS_CPP_EVENTS_BEFORE_WAKEUP);
+ this.rpc_num_targets = flagValue(source, appId, version, Flags.RPC_NUM_TARGETS);
+ this.rpc_events_before_wakeup = flagValue(source, appId, version, Flags.RPC_EVENTS_BEFORE_WAKEUP);
}
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@@ -305,7 +315,6 @@ public class ModelContextImpl implements ModelContext {
@Override public boolean useV8GeoPositions() { return useV8GeoPositions; }
@Override public int maxCompactBuffers() { return maxCompactBuffers; }
@Override public List<String> ignoredHttpUserAgents() { return ignoredHttpUserAgents; }
- @Override public boolean enableServerOcspStapling() { return enableServerOcspStapling; }
@Override public String mergeThrottlingPolicy() { return mergeThrottlingPolicy; }
@Override public double persistenceThrottlingWsDecrementFactor() { return persistenceThrottlingWsDecrementFactor; }
@Override public double persistenceThrottlingWsBackoff() { return persistenceThrottlingWsBackoff; }
@@ -318,6 +327,12 @@ public class ModelContextImpl implements ModelContext {
@Override public Architecture adminClusterArchitecture() { return adminClusterArchitecture; }
@Override public boolean enableProxyProtocolMixedMode() { return enableProxyProtocolMixedMode; }
@Override public boolean sharedStringRepoNoReclaim() { return sharedStringRepoNoReclaim; }
+ @Override public int mbusJavaRpcNumTargets() { return mbus_java_num_targets; }
+ @Override public int mbusJavaEventsBeforeWakeup() { return mbus_java_events_before_wakeup; }
+ @Override public int mbusCppRpcNumTargets() { return mbus_cpp_num_targets; }
+ @Override public int mbusCppEventsBeforeWakeup() { return mbus_cpp_events_before_wakeup; }
+ @Override public int rpcNumTargets() { return rpc_num_targets; }
+ @Override public int rpcEventsBeforeWakeup() { return rpc_events_before_wakeup; }
@Override public String logFileCompressionAlgorithm(String defVal) {
var fflag = this.logFileCompressionAlgorithm;
if (fflag != null && ! fflag.equals("")) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java
index d3b7e53157d..7f120a88a05 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java
@@ -141,7 +141,7 @@ public class FileDirectory {
File destination = new File(tempDestinationDir.toFile(), source.getName());
if (!destinationDir.exists()) {
destinationDir.mkdir();
- log.log(Level.FINE, () -> "file reference ' " + reference.value() + "', source: " + source.getAbsolutePath() );
+ log.log(Level.FINE, () -> "file reference '" + reference.value() + "', source: " + source.getAbsolutePath() );
if (source.isDirectory()) {
log.log(Level.FINE, () -> "Copying source " + source.getAbsolutePath() + " to " + destination.getAbsolutePath());
IOUtils.copyDirectory(source, destination, -1);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index f0c34e83713..770352e6bfc 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -21,6 +21,8 @@ import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import com.yahoo.vespa.filedistribution.LazyFileReferenceData;
import com.yahoo.vespa.filedistribution.LazyTemporaryStorageFileReferenceData;
+import com.yahoo.vespa.flags.FlagSource;
+import com.yahoo.vespa.flags.Flags;
import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.io.IOException;
@@ -28,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -35,12 +38,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster;
import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
-import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.Type;
public class FileServer {
@@ -52,6 +56,7 @@ public class FileServer {
private final FileDirectory root;
private final ExecutorService executor;
private final FileDownloader downloader;
+ private final List<CompressionType> compressionTypes; // compression types to use, in preferred order
// TODO: Move to filedistribution module, so that it can be used by both clients and servers
private enum FileApiErrorCodes {
@@ -86,21 +91,24 @@ public class FileServer {
@SuppressWarnings("WeakerAccess") // Created by dependency injection
@Inject
- public FileServer(ConfigserverConfig configserverConfig) {
+ public FileServer(ConfigserverConfig configserverConfig, FlagSource flagSource) {
this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())),
- createFileDownloader(getOtherConfigServersInCluster(configserverConfig)));
+ createFileDownloader(getOtherConfigServersInCluster(configserverConfig),
+ compressionTypes(Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value())),
+ compressionTypesAsList(Flags.FILE_DISTRIBUTION_COMPRESSION_TYPES_TO_SERVE.bindTo(flagSource).value()));
}
// For testing only
public FileServer(File rootDir) {
- this(rootDir, createFileDownloader(List.of()));
+ this(rootDir, createFileDownloader(List.of(), Set.of(gzip)), List.of(gzip));
}
- public FileServer(File rootDir, FileDownloader fileDownloader) {
+ FileServer(File rootDir, FileDownloader fileDownloader, List<CompressionType> compressionTypes) {
this.downloader = fileDownloader;
this.root = new FileDirectory(rootDir);
this.executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
new DaemonThreadFactory("file-server-"));
+ this.compressionTypes = compressionTypes;
}
boolean hasFile(String fileReference) {
@@ -145,10 +153,11 @@ public class FileServer {
if (file.isDirectory()) {
Path tempFile = Files.createTempFile("filereferencedata", reference.value());
CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes);
+ log.log(Level.FINE, () -> "accepted compression types=" + acceptedCompressionTypes + ", compression type to use=" + compressionType);
File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile());
- return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile);
+ return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile, compressionType);
} else {
- return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file);
+ return new LazyFileReferenceData(reference, file.getName(), Type.file, file, gzip);
}
}
@@ -195,9 +204,14 @@ public class FileServer {
return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND);
}
- // TODO: Use lz4 for testing only, add zstd when we have support for (de)compressing zstd input and output streams
+ /* Choose the first compression type (list is in preferred order) that matches an accepted compression type, or fail */
private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) {
- return acceptedCompressionTypes.contains(lz4) ? lz4 : gzip;
+ for (CompressionType compressionType : compressionTypes) {
+ if (acceptedCompressionTypes.contains(compressionType))
+ return compressionType;
+ }
+ throw new RuntimeException("Could not find a compression type that can be used. Accepted compression types: " +
+ acceptedCompressionTypes + ", compression types server can use: " + compressionTypes);
}
boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
@@ -228,14 +242,27 @@ public class FileServer {
executor.shutdown();
}
- private static FileDownloader createFileDownloader(List<String> configServers) {
+ private static FileDownloader createFileDownloader(List<String> configServers, Set<CompressionType> acceptedCompressionTypes) {
Supervisor supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true);
return new FileDownloader(configServers.isEmpty()
? FileDownloader.emptyConnectionPool()
: createConnectionPool(configServers, supervisor),
supervisor,
- timeout);
+ timeout,
+ acceptedCompressionTypes);
+ }
+
+ private static LinkedHashSet<CompressionType> compressionTypes(List<String> compressionTypes) {
+ return compressionTypes.stream()
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ private static List<CompressionType> compressionTypesAsList(List<String> compressionTypes) {
+ return compressionTypes.stream()
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toList());
}
private static ConnectionPool createConnectionPool(List<String> configServers, Supervisor supervisor) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java
index f7042b49c3f..0d4baa7dc56 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/ProxyResponse.java
@@ -38,4 +38,9 @@ class ProxyResponse extends HttpResponse {
}
}
+ @Override
+ public long maxPendingBytes() {
+ return 1 << 25; // 32MB
+ }
+
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
index ae4b205c06e..12972e5c465 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
@@ -18,20 +18,23 @@ import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool;
import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import com.yahoo.vespa.flags.FlagSource;
-
+import com.yahoo.vespa.flags.Flags;
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
/**
* Verifies that all active sessions has an application package on local disk.
* If not, the package is downloaded with file distribution. This can happen e.g.
- * if a configserver is down when the application is deployed.
+ * if a config server is down when the application is deployed.
*
* @author gjoranv
*/
@@ -53,7 +56,10 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
this.applicationRepository = applicationRepository;
this.configserverConfig = applicationRepository.configserverConfig();
this.downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir()));
- this.fileDownloader = createFileDownloader(configserverConfig, downloadDirectory, supervisor);
+ this.fileDownloader = createFileDownloader(configserverConfig,
+ downloadDirectory,
+ supervisor,
+ Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value());
}
@Override
@@ -94,14 +100,18 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
private static FileDownloader createFileDownloader(ConfigserverConfig configserverConfig,
File downloadDirectory,
- Supervisor supervisor) {
+ Supervisor supervisor,
+ List<String> flagValues) {
List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(configserverConfig);
ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster);
ConnectionPool connectionPool = (otherConfigServersInCluster.isEmpty())
? FileDownloader.emptyConnectionPool()
: new FileDistributionConnectionPool(configSourceSet, supervisor);
- return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300));
+ Set<CompressionType> acceptedCompressionTypes = flagValues.stream()
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toSet());
+ return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300), acceptedCompressionTypes);
}
@Override
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 687cb1d3cca..7993e58d06e 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -491,6 +491,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
sendParts(session, fileData);
sendEof(session, fileData, status);
}
+
private void sendParts(int session, FileReferenceData fileData) {
ByteBuffer bb = ByteBuffer.allocate(0x100000);
for (int partId = 0, read = fileData.nextContent(bb); read >= 0; partId++, read = fileData.nextContent(bb)) {
@@ -504,12 +505,9 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
bb.clear();
}
}
+
private int sendMeta(FileReferenceData fileData) {
- Request request = new Request(FileReceiver.RECEIVE_META_METHOD);
- request.parameters().add(new StringValue(fileData.fileReference().value()));
- request.parameters().add(new StringValue(fileData.filename()));
- request.parameters().add(new StringValue(fileData.type().name()));
- request.parameters().add(new Int64Value(fileData.size()));
+ Request request = createMetaRequest(fileData);
invokeRpcIfValidConnection(request);
if (request.isError()) {
log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
@@ -522,6 +520,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
return request.returnValues().get(1).asInt32();
}
}
+
+ // non-private for testing
+ static Request createMetaRequest(FileReferenceData fileData) {
+ Request request = new Request(FileReceiver.RECEIVE_META_METHOD);
+ request.parameters().add(new StringValue(fileData.fileReference().value()));
+ request.parameters().add(new StringValue(fileData.filename()));
+ request.parameters().add(new StringValue(fileData.type().name()));
+ request.parameters().add(new Int64Value(fileData.size()));
+ // Only add paramter if not gzip, this is default and old clients will not handle the extra parameter
+ if (fileData.compressionType() != CompressionType.gzip)
+ request.parameters().add(new StringValue(fileData.compressionType().name()));
+ return request;
+ }
+
private void sendPart(int session, FileReference ref, int partId, byte [] buf) {
Request request = new Request(FileReceiver.RECEIVE_PART_METHOD);
request.parameters().add(new StringValue(ref.value()));
@@ -538,6 +550,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
}
}
}
+
private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) {
Request request = new Request(FileReceiver.RECEIVE_EOF_METHOD);
request.parameters().add(new StringValue(fileData.fileReference().value()));
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index a144940e443..d803488cb0a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -44,13 +44,11 @@ import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.vespa.flags.FlagSource;
import com.yahoo.vespa.flags.Flags;
-import com.yahoo.vespa.flags.StringFlag;
import com.yahoo.yolean.Exceptions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.KeeperException;
-
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -83,6 +81,7 @@ import java.util.logging.Logger;
import java.util.stream.Collectors;
import static com.yahoo.vespa.curator.Curator.CompletionWaiter;
+import static com.yahoo.vespa.flags.FetchVector.Dimension.APPLICATION_ID;
import static java.nio.file.Files.readAttributes;
/**
@@ -128,7 +127,6 @@ public class SessionRepository {
private final ModelFactoryRegistry modelFactoryRegistry;
private final ConfigDefinitionRepo configDefinitionRepo;
private final int maxNodeSize;
- private final StringFlag failDeploymentForFilesWithUnknownExtension;
public SessionRepository(TenantName tenantName,
TenantApplications applicationRepo,
@@ -172,7 +170,6 @@ public class SessionRepository {
this.modelFactoryRegistry = modelFactoryRegistry;
this.configDefinitionRepo = configDefinitionRepo;
this.maxNodeSize = maxNodeSize;
- this.failDeploymentForFilesWithUnknownExtension = Flags.APPLICATION_FILES_WITH_UNKNOWN_EXTENSION.bindTo(flagSource);
loadSessions(); // Needs to be done before creating cache below
this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, zkCacheExecutor);
@@ -684,7 +681,10 @@ public class SessionRepository {
try {
app.validateFileExtensions();
} catch (IllegalArgumentException e) {
- switch (failDeploymentForFilesWithUnknownExtension.value()) {
+ String flag = Flags.APPLICATION_FILES_WITH_UNKNOWN_EXTENSION.bindTo(flagSource)
+ .with(APPLICATION_ID, applicationId.serializedForm())
+ .value();
+ switch (flag) {
case "FAIL":
throw e;
case "LOG":