summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java9
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java4
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java6
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java110
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java1
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java1
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp30
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h27
-rw-r--r--vespalib/src/tests/portal/reactor/reactor_test.cpp46
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java9
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java10
13 files changed, 155 insertions, 105 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index 44d729c802e..94b65a01e57 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -14,6 +14,8 @@ import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.Lock;
import com.yahoo.yolean.Exceptions;
+import java.io.Closeable;
+import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@@ -29,7 +31,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
*
* @author jonmv
*/
-public class ReindexingCurator {
+public class ReindexingCurator implements Closeable {
private static final Logger log = Logger.getLogger(ReindexingCurator.class.getName());
@@ -94,6 +96,11 @@ public class ReindexingCurator {
private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); }
private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); }
+ @Override
+ public void close() {
+ curator.close();
+ }
+
private static class ReindexingSerializer {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index a9642c591ea..afb491fa293 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -103,9 +103,9 @@ public class ReindexingMaintainer extends AbstractComponent {
executor.awaitTermination(5, TimeUnit.SECONDS); // Give it 5s to complete gracefully.
- curator.close(); // Close the underlying curator independently to force shutdown
+ curator.close(); // Close the underlying curator independently to force shutdown.
- if ( !executor.isShutdown() && ! executor.awaitTermination(5, TimeUnit.SECONDS))
+ if ( ! executor.isShutdown() && ! executor.awaitTermination(5, TimeUnit.SECONDS))
log.log(WARNING, "Failed to shut down reindexing within timeout");
}
catch (InterruptedException e) {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
index 8d9d21999d5..7816686221b 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
@@ -65,6 +65,11 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
return ErrorResponse.notFoundError("Nothing at " + request.getUri().getRawPath());
}
+ @Override
+ public void destroy() {
+ database.close();
+ }
+
HttpResponse getRoot() {
Slime slime = new Slime();
slime.setObject().setArray("resources").addObject().setString("url", "/reindexing/v1/status");
@@ -88,7 +93,6 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler {
return new SlimeJsonResponse(slime);
}
-
private static String toString(Reindexing.State state) {
switch (state) {
case READY: return "pending";
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index dec5cbb7ee6..75ee9d66b4a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -212,54 +212,54 @@ public class ModelContextImpl implements ModelContext {
private final boolean mergeGroupingResultInSearchInvoker;
private final boolean experimentalSdParsing;
- public FeatureFlags(FlagSource source, ApplicationId appId) {
- this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT);
- this.useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES);
- this.feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE);
- this.feedTaskLimit = flagValue(source, appId, Flags.FEED_TASK_LIMIT);
- this.feedMasterTaskLimit = flagValue(source, appId, Flags.FEED_MASTER_TASK_LIMIT);
- this.sharedFieldWriterExecutor = flagValue(source, appId, Flags.SHARED_FIELD_WRITER_EXECUTOR);
- this.responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE);
- this.numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS);
- this.skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD);
- this.skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD);
- this.skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD);
- this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE);
- this.feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY);
- this.allowedAthenzProxyIdentities = flagValue(source, appId, Flags.ALLOWED_ATHENZ_PROXY_IDENTITIES);
- this.maxActivationInhibitedOutOfSyncGroups = flagValue(source, appId, Flags.MAX_ACTIVATION_INHIBITED_OUT_OF_SYNC_GROUPS);
- this.jvmOmitStackTraceInFastThrow = type -> flagValueAsInt(source, appId, type, PermanentFlags.JVM_OMIT_STACK_TRACE_IN_FAST_THROW);
- this.maxConcurrentMergesPerContentNode = flagValue(source, appId, Flags.MAX_CONCURRENT_MERGES_PER_NODE);
- this.maxMergeQueueSize = flagValue(source, appId, Flags.MAX_MERGE_QUEUE_SIZE);
- this.resourceLimitDisk = flagValue(source, appId, PermanentFlags.RESOURCE_LIMIT_DISK);
- this.resourceLimitMemory = flagValue(source, appId, PermanentFlags.RESOURCE_LIMIT_MEMORY);
- this.minNodeRatioPerGroup = flagValue(source, appId, Flags.MIN_NODE_RATIO_PER_GROUP);
- this.metricsproxyNumThreads = flagValue(source, appId, Flags.METRICSPROXY_NUM_THREADS);
- this.availableProcessors = flagValue(source, appId, Flags.AVAILABLE_PROCESSORS);
- this.containerDumpHeapOnShutdownTimeout = flagValue(source, appId, Flags.CONTAINER_DUMP_HEAP_ON_SHUTDOWN_TIMEOUT);
- this.containerShutdownTimeout = flagValue(source, appId,Flags.CONTAINER_SHUTDOWN_TIMEOUT);
- this.maxUnCommittedMemory = flagValue(source, appId, Flags.MAX_UNCOMMITTED_MEMORY);
- this.forwardIssuesAsErrors = flagValue(source, appId, PermanentFlags.FORWARD_ISSUES_AS_ERRORS);
- this.ignoreThreadStackSizes = flagValue(source, appId, Flags.IGNORE_THREAD_STACK_SIZES);
- this.unorderedMergeChaining = flagValue(source, appId, Flags.UNORDERED_MERGE_CHAINING);
- this.useV8GeoPositions = flagValue(source, appId, Flags.USE_V8_GEO_POSITIONS);
- this.useV8DocManagerCfg = flagValue(source, appId, Flags.USE_V8_DOC_MANAGER_CFG);
- this.maxCompactBuffers = flagValue(source, appId, Flags.MAX_COMPACT_BUFFERS);
- this.failDeploymentWithInvalidJvmOptions = flagValue(source, appId, Flags.FAIL_DEPLOYMENT_WITH_INVALID_JVM_OPTIONS);
- this.ignoredHttpUserAgents = flagValue(source, appId, PermanentFlags.IGNORED_HTTP_USER_AGENTS);
- this.enableServerOcspStapling = flagValue(source, appId, Flags.ENABLE_SERVER_OCSP_STAPLING);
- this.persistenceAsyncThrottling = flagValue(source, appId, Flags.PERSISTENCE_ASYNC_THROTTLING);
- this.mergeThrottlingPolicy = flagValue(source, appId, Flags.MERGE_THROTTLING_POLICY);
- this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR);
- this.persistenceThrottlingWsBackoff = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF);
- this.persistenceThrottlingWindowSize = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WINDOW_SIZE);
- this.persistenceThrottlingWsResizeRate = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WS_RESIZE_RATE);
- this.persistenceThrottlingOfMergeFeedOps = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_OF_MERGE_FEED_OPS);
- this.inhibitDefaultMergesWhenGlobalMergesPending = flagValue(source, appId, Flags.INHIBIT_DEFAULT_MERGES_WHEN_GLOBAL_MERGES_PENDING);
- this.useQrserverServiceName = flagValue(source, appId, Flags.USE_QRSERVER_SERVICE_NAME);
- this.avoidRenamingSummaryFeatures = flagValue(source, appId, Flags.AVOID_RENAMING_SUMMARY_FEATURES);
- this.mergeGroupingResultInSearchInvoker = flagValue(source, appId, Flags.MERGE_GROUPING_RESULT_IN_SEARCH_INVOKER);
- this.experimentalSdParsing = flagValue(source, appId, Flags.EXPERIMENTAL_SD_PARSING);
+ public FeatureFlags(FlagSource source, ApplicationId appId, Version version) {
+ this.defaultTermwiseLimit = flagValue(source, appId, version, Flags.DEFAULT_TERM_WISE_LIMIT);
+ this.useThreePhaseUpdates = flagValue(source, appId, version, Flags.USE_THREE_PHASE_UPDATES);
+ this.feedSequencer = flagValue(source, appId, version, Flags.FEED_SEQUENCER_TYPE);
+ this.feedTaskLimit = flagValue(source, appId, version, Flags.FEED_TASK_LIMIT);
+ this.feedMasterTaskLimit = flagValue(source, appId, version, Flags.FEED_MASTER_TASK_LIMIT);
+ this.sharedFieldWriterExecutor = flagValue(source, appId, version, Flags.SHARED_FIELD_WRITER_EXECUTOR);
+ this.responseSequencer = flagValue(source, appId, version, Flags.RESPONSE_SEQUENCER_TYPE);
+ this.numResponseThreads = flagValue(source, appId, version, Flags.RESPONSE_NUM_THREADS);
+ this.skipCommunicationManagerThread = flagValue(source, appId, version, Flags.SKIP_COMMUNICATIONMANAGER_THREAD);
+ this.skipMbusRequestThread = flagValue(source, appId, version, Flags.SKIP_MBUS_REQUEST_THREAD);
+ this.skipMbusReplyThread = flagValue(source, appId, version, Flags.SKIP_MBUS_REPLY_THREAD);
+ this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, version, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE);
+ this.feedConcurrency = flagValue(source, appId, version, Flags.FEED_CONCURRENCY);
+ this.allowedAthenzProxyIdentities = flagValue(source, appId, version, Flags.ALLOWED_ATHENZ_PROXY_IDENTITIES);
+ this.maxActivationInhibitedOutOfSyncGroups = flagValue(source, appId, version, Flags.MAX_ACTIVATION_INHIBITED_OUT_OF_SYNC_GROUPS);
+ this.jvmOmitStackTraceInFastThrow = type -> flagValueAsInt(source, appId, version, type, PermanentFlags.JVM_OMIT_STACK_TRACE_IN_FAST_THROW);
+ this.maxConcurrentMergesPerContentNode = flagValue(source, appId, version, Flags.MAX_CONCURRENT_MERGES_PER_NODE);
+ this.maxMergeQueueSize = flagValue(source, appId, version, Flags.MAX_MERGE_QUEUE_SIZE);
+ this.resourceLimitDisk = flagValue(source, appId, version, PermanentFlags.RESOURCE_LIMIT_DISK);
+ this.resourceLimitMemory = flagValue(source, appId, version, PermanentFlags.RESOURCE_LIMIT_MEMORY);
+ this.minNodeRatioPerGroup = flagValue(source, appId, version, Flags.MIN_NODE_RATIO_PER_GROUP);
+ this.metricsproxyNumThreads = flagValue(source, appId, version, Flags.METRICSPROXY_NUM_THREADS);
+ this.availableProcessors = flagValue(source, appId, version, Flags.AVAILABLE_PROCESSORS);
+ this.containerDumpHeapOnShutdownTimeout = flagValue(source, appId, version, Flags.CONTAINER_DUMP_HEAP_ON_SHUTDOWN_TIMEOUT);
+ this.containerShutdownTimeout = flagValue(source, appId, version, Flags.CONTAINER_SHUTDOWN_TIMEOUT);
+ this.maxUnCommittedMemory = flagValue(source, appId, version, Flags.MAX_UNCOMMITTED_MEMORY);
+ this.forwardIssuesAsErrors = flagValue(source, appId, version, PermanentFlags.FORWARD_ISSUES_AS_ERRORS);
+ this.ignoreThreadStackSizes = flagValue(source, appId, version, Flags.IGNORE_THREAD_STACK_SIZES);
+ this.unorderedMergeChaining = flagValue(source, appId, version, Flags.UNORDERED_MERGE_CHAINING);
+ this.useV8GeoPositions = flagValue(source, appId, version, Flags.USE_V8_GEO_POSITIONS);
+ this.useV8DocManagerCfg = flagValue(source, appId, version, Flags.USE_V8_DOC_MANAGER_CFG);
+ this.maxCompactBuffers = flagValue(source, appId, version, Flags.MAX_COMPACT_BUFFERS);
+ this.failDeploymentWithInvalidJvmOptions = flagValue(source, appId, version, Flags.FAIL_DEPLOYMENT_WITH_INVALID_JVM_OPTIONS);
+ this.ignoredHttpUserAgents = flagValue(source, appId, version, PermanentFlags.IGNORED_HTTP_USER_AGENTS);
+ this.enableServerOcspStapling = flagValue(source, appId, version, Flags.ENABLE_SERVER_OCSP_STAPLING);
+ this.persistenceAsyncThrottling = flagValue(source, appId, version, Flags.PERSISTENCE_ASYNC_THROTTLING);
+ this.mergeThrottlingPolicy = flagValue(source, appId, version, Flags.MERGE_THROTTLING_POLICY);
+ this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR);
+ this.persistenceThrottlingWsBackoff = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF);
+ this.persistenceThrottlingWindowSize = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WINDOW_SIZE);
+ this.persistenceThrottlingWsResizeRate = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_RESIZE_RATE);
+ this.persistenceThrottlingOfMergeFeedOps = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_OF_MERGE_FEED_OPS);
+ this.inhibitDefaultMergesWhenGlobalMergesPending = flagValue(source, appId, version, Flags.INHIBIT_DEFAULT_MERGES_WHEN_GLOBAL_MERGES_PENDING);
+ this.useQrserverServiceName = flagValue(source, appId, version, Flags.USE_QRSERVER_SERVICE_NAME);
+ this.avoidRenamingSummaryFeatures = flagValue(source, appId, version, Flags.AVOID_RENAMING_SUMMARY_FEATURES);
+ this.mergeGroupingResultInSearchInvoker = flagValue(source, appId, version, Flags.MERGE_GROUPING_RESULT_IN_SEARCH_INVOKER);
+ this.experimentalSdParsing = flagValue(source, appId, version, Flags.EXPERIMENTAL_SD_PARSING);
}
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@@ -312,33 +312,38 @@ public class ModelContextImpl implements ModelContext {
@Override public boolean mergeGroupingResultInSearchInvoker() { return mergeGroupingResultInSearchInvoker; }
@Override public boolean experimentalSdParsing() { return experimentalSdParsing; }
- private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) {
+ private static <V> V flagValue(FlagSource source, ApplicationId appId, Version vespaVersion, UnboundFlag<? extends V, ?, ?> flag) {
return flag.bindTo(source)
.with(FetchVector.Dimension.APPLICATION_ID, appId.serializedForm())
+ .with(FetchVector.Dimension.VESPA_VERSION, vespaVersion.toFullString())
.boxedValue();
}
- private static <V> V flagValue(FlagSource source, TenantName tenant, UnboundFlag<? extends V, ?, ?> flag) {
+ private static <V> V flagValue(FlagSource source, TenantName tenant, Version vespaVersion, UnboundFlag<? extends V, ?, ?> flag) {
return flag.bindTo(source)
.with(FetchVector.Dimension.TENANT_ID, tenant.value())
+ .with(FetchVector.Dimension.VESPA_VERSION, vespaVersion.toFullString())
.boxedValue();
}
private static <V> V flagValue(FlagSource source,
ApplicationId appId,
+ Version vespaVersion,
ClusterSpec.Type clusterType,
UnboundFlag<? extends V, ?, ?> flag) {
return flag.bindTo(source)
.with(FetchVector.Dimension.APPLICATION_ID, appId.serializedForm())
.with(FetchVector.Dimension.CLUSTER_TYPE, clusterType.name())
+ .with(FetchVector.Dimension.VESPA_VERSION, vespaVersion.toFullString())
.boxedValue();
}
static int flagValueAsInt(FlagSource source,
ApplicationId appId,
+ Version version,
ClusterSpec.Type clusterType,
UnboundFlag<? extends Boolean, ?, ?> flag) {
- return flagValue(source, appId, clusterType, flag) ? 1 : 0;
+ return flagValue(source, appId, version, clusterType, flag) ? 1 : 0;
}
private String translateJvmOmitStackTraceInFastThrowIntToString(ToIntFunction<ClusterSpec.Type> function,
@@ -375,6 +380,7 @@ public class ModelContextImpl implements ModelContext {
private final List<String> environmentVariables;
public Properties(ApplicationId applicationId,
+ Version nodeVespaVersion,
ConfigserverConfig configserverConfig,
Zone zone,
Set<ContainerEndpoint> endpoints,
@@ -387,7 +393,7 @@ public class ModelContextImpl implements ModelContext {
List<TenantSecretStore> tenantSecretStores,
SecretStore secretStore,
List<X509Certificate> operatorCertificates) {
- this.featureFlags = new FeatureFlags(flagSource, applicationId);
+ this.featureFlags = new FeatureFlags(flagSource, applicationId, nodeVespaVersion);
this.applicationId = applicationId;
this.multitenant = configserverConfig.multitenant() || configserverConfig.hostedVespa() || Boolean.getBoolean("multitenant");
this.configServerSpecs = fromConfig(configserverConfig);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java b/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java
index 54ceb394ee6..f2d9eb835be 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java
@@ -102,7 +102,7 @@ public class ActivatedModelsBuilder extends ModelsBuilder<Application> {
) {
log.log(Level.FINE, () -> String.format("Loading model version %s for session %s application %s",
modelFactory.version(), applicationGeneration, applicationId));
- ModelContext.Properties modelContextProperties = createModelContextProperties(applicationId, applicationPackage);
+ ModelContext.Properties modelContextProperties = createModelContextProperties(applicationId, wantedNodeVespaVersion, applicationPackage);
Provisioned provisioned = new Provisioned();
ModelContext modelContext = new ModelContextImpl(
applicationPackage,
@@ -146,8 +146,9 @@ public class ActivatedModelsBuilder extends ModelsBuilder<Application> {
return Optional.of(value);
}
- private ModelContext.Properties createModelContextProperties(ApplicationId applicationId, ApplicationPackage applicationPackage) {
+ private ModelContext.Properties createModelContextProperties(ApplicationId applicationId, Version wantedNodeVespaVersion, ApplicationPackage applicationPackage) {
return new ModelContextImpl.Properties(applicationId,
+ wantedNodeVespaVersion,
configserverConfig,
zone(),
ImmutableSet.copyOf(new ContainerEndpointsCache(TenantRepository.getTenantPath(tenant), curator).read(applicationId)),
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java
index f8ee3e5e0c9..8293871335d 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java
@@ -191,6 +191,7 @@ public class SessionPreparer {
this.containerEndpoints = readEndpointsIfNull(params.containerEndpoints());
this.athenzDomain = params.athenzDomain();
this.properties = new ModelContextImpl.Properties(params.getApplicationId(),
+ vespaVersion,
configserverConfig,
zone,
Set.copyOf(containerEndpoints),
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java
index 659bc771ac7..41e6fe98441 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java
@@ -63,6 +63,7 @@ public class ModelContextImplTest {
new Provisioned(),
new ModelContextImpl.Properties(
ApplicationId.defaultId(),
+ Version.emptyVersion,
configserverConfig,
Zone.defaultZone(),
endpoints,
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java b/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java
index f85f626d4b7..fa749d924d1 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java
@@ -29,7 +29,7 @@ public interface DeactivatedContainer {
* DeactivatedContainer is considered to have terminated once there are no more {@link Request}s, {@link Response}s
* or corresponding {@link ContentChannel}s being processed by components that belong to it.</p>
*
- * <p>If termination has already occured, this method immediately runs the given Runnable in the current thread.</p>
+ * <p>If termination has already occurred, this method immediately runs the given Runnable in the current thread.</p>
*
* @param task The task to run once this DeactivatedContainer has terminated.
*/
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index 20aba93d4d7..2e53db3bd87 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -490,7 +490,7 @@ IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index)
args->old_absolute_id = _current_index_id + _last_fusion_id;
args->old_source_list = _source_list;
string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id));
- args->flush_serial_num = _current_serial_num;
+ args->flush_serial_num = current_serial_num();
{
LockGuard lock(_index_update_lock);
// Handover of extra memory indexes to flush
@@ -795,7 +795,7 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
args._oldSourceList = _source_list; // Delay destruction
uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id;
string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId));
- SerialNum freezeSerialNum = _current_serial_num;
+ SerialNum freezeSerialNum = current_serial_num();
bool dropEmptyLast = false;
SaveInfo::UP saveInfo;
@@ -921,7 +921,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
_flush_serial_num = IndexReadUtilities::readSerialNum(latest_index_dir);
_lastFlushTime = search::FileKit::getModificationTime(latest_index_dir);
- _current_serial_num = _flush_serial_num;
+ set_current_serial_num(_flush_serial_num);
const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir);
_selector = FixedSourceSelector::load(selector, _next_id - 1);
} else {
@@ -941,7 +941,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
_selector->setDefaultSource(_current_index_id);
auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector));
- _current_index = operations.createMemoryIndex(_schema, *sourceList, _current_serial_num);
+ _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num());
LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num);
sourceList->append(_current_index_id, _current_index);
sourceList->setCurrentIndex(_current_index_id);
@@ -966,10 +966,10 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits
{
LockGuard lock(_index_update_lock);
- _current_serial_num = std::max(_current_serial_num, serialNum);
+ set_current_serial_num(std::max(current_serial_num(), serialNum));
}
- IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, _current_serial_num));
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num()));
FlushArgs args;
args.stats = stats;
// Ensure that all index thread tasks accessing memory index have completed.
@@ -984,7 +984,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
if (args._skippedEmptyLast && args._extraIndexes.empty()) {
// No memory index to flush, it was empty
LockGuard lock(_state_lock);
- _flush_serial_num = _current_serial_num;
+ _flush_serial_num = current_serial_num();
_lastFlushTime = vespalib::system_clock::now();
LOG(debug, "No memory index to flush. Update serial number and flush time to current: "
"flushSerialNum(%" PRIu64 "), lastFlushTime(%f)",
@@ -1014,7 +1014,7 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok
// XXX: Claims to have flushed memory index when starting fusion.
{
LockGuard lock(_index_update_lock);
- _current_serial_num = std::max(_current_serial_num, serialNum);
+ set_current_serial_num(std::max(current_serial_num(), serialNum));
}
FusionSpec spec;
@@ -1234,7 +1234,7 @@ IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serial
_selector->setSource(lid, _current_index_id);
_source_list->setSource(lid);
++_source_selector_changes;
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
}
void
@@ -1247,7 +1247,7 @@ IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum)
_source_list->setSource(lid);
}
_source_selector_changes += lids.size();
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
_current_index->removeDocuments(std::move(lids));
}
@@ -1267,7 +1267,7 @@ IndexMaintainer::commit(vespalib::Gate& gate)
// only triggered via commit_and_wait()
assert(_ctx.getThreadingService().index().isCurrentThread());
LockGuard lock(_index_update_lock);
- _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), _current_serial_num);
+ _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), current_serial_num());
}
void
@@ -1275,7 +1275,7 @@ IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone)
{
assert(_ctx.getThreadingService().index().isCurrentThread());
LockGuard lock(_index_update_lock);
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
_current_index->commit(onWriteDone, serialNum);
}
@@ -1284,7 +1284,7 @@ IndexMaintainer::heartBeat(SerialNum serialNum)
{
assert(_ctx.getThreadingService().index().isCurrentThread());
LockGuard lock(_index_update_lock);
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
}
void
@@ -1293,7 +1293,7 @@ IndexMaintainer::compactLidSpace(uint32_t lidLimit, SerialNum serialNum)
assert(_ctx.getThreadingService().index().isCurrentThread());
LOG(info, "compactLidSpace(%u, %" PRIu64 ")", lidLimit, serialNum);
LockGuard lock(_index_update_lock);
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
_selector->compactLidSpace(lidLimit);
}
@@ -1313,7 +1313,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
pruneRemovedFields(schema, serialNum);
- IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, _current_serial_num));
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num()));
SetSchemaArgs args;
args._newSchema = schema;
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
index b4bba209937..dfae5b4d643 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -18,6 +18,7 @@
#include <vespa/searchcorespi/flush/flushstats.h>
#include <vespa/searchlib/attribute/fixedsourceselector.h>
#include <vespa/searchlib/common/serialnum.h>
+#include <atomic>
namespace document { class Document; }
@@ -87,14 +88,14 @@ class IndexMaintainer : public IIndexManager,
// _selector is protected by SL + IUL
ISourceSelector::SP _selector;
ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread
- uint32_t _last_fusion_id; // Protected by SL + IUL
- uint32_t _next_id; // Protected by SL + IUL
- uint32_t _current_index_id; // Protected by SL + IUL
- IMemoryIndex::SP _current_index; // Protected by SL + IUL
- bool _flush_empty_current_index;
- SerialNum _current_serial_num;// Protected by IUL
- SerialNum _flush_serial_num; // Protected by SL
- vespalib::system_time _lastFlushTime; // Protected by SL
+ uint32_t _last_fusion_id; // Protected by SL + IUL
+ uint32_t _next_id; // Protected by SL + IUL
+ uint32_t _current_index_id; // Protected by SL + IUL
+ IMemoryIndex::SP _current_index; // Protected by SL + IUL
+ bool _flush_empty_current_index;
+ std::atomic<SerialNum> _current_serial_num;// Protected by IUL
+ SerialNum _flush_serial_num; // Protected by SL
+ vespalib::system_time _lastFlushTime; // Protected by SL
// Extra frozen memory indexes. This list is empty unless new
// memory index has been added by force (due to config change or
// data structure limitations).
@@ -263,6 +264,12 @@ class IndexMaintainer : public IIndexManager,
void commit_and_wait();
void commit(vespalib::Gate& gate);
void pruneRemovedFields(const Schema &schema, SerialNum serialNum);
+ [[nodiscard]] SerialNum current_serial_num() const noexcept {
+ return _current_serial_num.load(std::memory_order_relaxed);
+ }
+ void set_current_serial_num(SerialNum new_serial_num) noexcept {
+ _current_serial_num.store(new_serial_num, std::memory_order_relaxed);
+ }
public:
IndexMaintainer(const IndexMaintainer &) = delete;
@@ -270,7 +277,7 @@ public:
IndexMaintainer(const IndexMaintainerConfig &config,
const IndexMaintainerContext &context,
IIndexMaintainerOperations &operations);
- ~IndexMaintainer();
+ ~IndexMaintainer() override;
/**
* Starts a new MemoryIndex, and dumps the previous one to disk.
@@ -333,7 +340,7 @@ public:
void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) override;
SerialNum getCurrentSerialNum() const override {
- return _current_serial_num;
+ return _current_serial_num.load(std::memory_order_relaxed);
}
SerialNum getFlushedSerialNum() const override {
diff --git a/vespalib/src/tests/portal/reactor/reactor_test.cpp b/vespalib/src/tests/portal/reactor/reactor_test.cpp
index 82d4d8d3794..d7d61f9ee79 100644
--- a/vespalib/src/tests/portal/reactor/reactor_test.cpp
+++ b/vespalib/src/tests/portal/reactor/reactor_test.cpp
@@ -42,15 +42,13 @@ void wait_tick() {
}
}
-struct SimpleHandler : Reactor::EventHandler {
+struct HandlerBase : Reactor::EventHandler {
SocketPair sockets;
std::atomic<size_t> read_cnt;
std::atomic<size_t> write_cnt;
- Reactor::Token::UP token;
- SimpleHandler(Reactor &reactor, bool read, bool write)
- : sockets(), read_cnt(0), write_cnt(0), token()
+ HandlerBase()
+ : sockets(), read_cnt(0), write_cnt(0)
{
- token = reactor.attach(*this, sockets.main.get(), read, write);
}
void handle_event(bool read, bool write) override {
if (read) {
@@ -68,16 +66,33 @@ struct SimpleHandler : Reactor::EventHandler {
EXPECT_EQUAL((read_sample != read_cnt), read);
EXPECT_EQUAL((write_sample != write_cnt), write);
}
+ ~HandlerBase();
+};
+HandlerBase::~HandlerBase() = default;
+
+struct SimpleHandler : HandlerBase {
+ Reactor::Token::UP token;
+ SimpleHandler(Reactor &reactor, bool read, bool write)
+ : HandlerBase(), token()
+ {
+ token = reactor.attach(*this, sockets.main.get(), read, write);
+ }
~SimpleHandler();
};
SimpleHandler::~SimpleHandler() = default;
-struct DeletingHandler : SimpleHandler {
+struct DeletingHandler : HandlerBase {
+ Gate allow_delete;
Gate token_deleted;
- DeletingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true),
- token_deleted() {}
+ Reactor::Token::UP token;
+ DeletingHandler(Reactor &reactor)
+ : HandlerBase(), allow_delete(), token_deleted(), token()
+ {
+ token = reactor.attach(*this, sockets.main.get(), true, true);
+ }
void handle_event(bool read, bool write) override {
- SimpleHandler::handle_event(read, write);
+ HandlerBase::handle_event(read, write);
+ allow_delete.await();
token.reset();
token_deleted.countDown();
}
@@ -85,14 +100,18 @@ struct DeletingHandler : SimpleHandler {
};
DeletingHandler::~DeletingHandler() = default;
-struct WaitingHandler : SimpleHandler {
+struct WaitingHandler : HandlerBase {
Gate enter_callback;
Gate exit_callback;
- WaitingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true),
- enter_callback(), exit_callback() {}
+ Reactor::Token::UP token;
+ WaitingHandler(Reactor &reactor)
+ : HandlerBase(), enter_callback(), exit_callback(), token()
+ {
+ token = reactor.attach(*this, sockets.main.get(), true, true);
+ }
void handle_event(bool read, bool write) override {
enter_callback.countDown();
- SimpleHandler::handle_event(read, write);
+ HandlerBase::handle_event(read, write);
exit_callback.await();
}
~WaitingHandler();
@@ -135,6 +154,7 @@ TEST_FF("require that deleting reactor token disables io events", Reactor(tick),
TEST_FF("require that reactor token can be destroyed during io event handling", Reactor(tick), TimeBomb(60)) {
DeletingHandler handler(f1);
+ handler.allow_delete.countDown();
handler.token_deleted.await();
TEST_DO(handler.verify(false, false));
EXPECT_EQUAL(handler.read_cnt, 1u);
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index d226cfcd43e..2b5395773ee 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.curator;
import com.google.inject.Inject;
import com.yahoo.cloud.config.CuratorConfig;
+import com.yahoo.component.AbstractComponent;
import com.yahoo.path.Path;
import com.yahoo.vespa.curator.api.VespaCurator;
import com.yahoo.vespa.curator.recipes.CuratorCounter;
@@ -49,7 +50,7 @@ import java.util.logging.Logger;
* @author vegardh
* @author bratseth
*/
-public class Curator implements VespaCurator, AutoCloseable {
+public class Curator extends AbstractComponent implements VespaCurator, AutoCloseable {
private static final Logger LOG = Logger.getLogger(Curator.class.getName());
private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg"));
@@ -82,7 +83,6 @@ public class Curator implements VespaCurator, AutoCloseable {
}
@Inject
- // TODO jonmv: Use a Provider for this, due to required shutdown.
public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
// Depends on ZooKeeperServer to make sure it is started first
this(ConnectionSpec.create(curatorConfig.server(),
@@ -316,6 +316,11 @@ public class Curator implements VespaCurator, AutoCloseable {
curatorFramework.close();
}
+ @Override
+ public void deconstruct() {
+ close();
+ }
+
/**
* Interface for waiting for completion of an operation
*/
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
index 604419c063d..389df340ca7 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -29,8 +29,7 @@ public class Reconfigurer extends AbstractComponent {
private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName());
- private static final Duration MIN_TIMEOUT = Duration.ofMinutes(3);
- private static final Duration NODE_TIMEOUT = Duration.ofMinutes(1);
+ private static final Duration TIMEOUT = Duration.ofMinutes(3);
private final ExponentialBackoff backoff = new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10));
private final VespaZooKeeperAdmin vespaZooKeeperAdmin;
@@ -95,7 +94,7 @@ public class Reconfigurer extends AbstractComponent {
"\nServers in new config:" + servers(newConfig));
String connectionSpec = localConnectionSpec(activeConfig);
Instant now = Instant.now();
- Duration reconfigTimeout = reconfigTimeout(newConfig.server().size());
+ Duration reconfigTimeout = reconfigTimeout();
Instant end = now.plus(reconfigTimeout);
// Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
for (int attempt = 1; now.isBefore(end); attempt++) {
@@ -129,11 +128,10 @@ public class Reconfigurer extends AbstractComponent {
Process.logAndDie("Reconfiguration did not complete within timeout " + reconfigTimeout + ". Forcing container shutdown.");
}
- /** Returns the timeout to use for the given joining server count */
- private static Duration reconfigTimeout(int joiningServers) {
+ private static Duration reconfigTimeout() {
// For reconfig to succeed, the current and resulting ensembles must have a majority. When an ensemble grows and
// the joining servers outnumber the existing ones, we have to wait for enough of them to start to have a majority.
- return Duration.ofMillis(Math.max(joiningServers * NODE_TIMEOUT.toMillis(), MIN_TIMEOUT.toMillis()));
+ return TIMEOUT;
}
private static String localConnectionSpec(ZookeeperServerConfig config) {