diff options
13 files changed, 155 insertions, 105 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java index 44d729c802e..94b65a01e57 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -14,6 +14,8 @@ import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.Lock; import com.yahoo.yolean.Exceptions; +import java.io.Closeable; +import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.List; @@ -29,7 +31,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap; * * @author jonmv */ -public class ReindexingCurator { +public class ReindexingCurator implements Closeable { private static final Logger log = Logger.getLogger(ReindexingCurator.class.getName()); @@ -94,6 +96,11 @@ public class ReindexingCurator { private Path statusPath(String clusterName) { return rootPath(clusterName).append("status"); } private Path lockPath(String clusterName) { return rootPath(clusterName).append("lock"); } + @Override + public void close() { + curator.close(); + } + private static class ReindexingSerializer { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index a9642c591ea..afb491fa293 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -103,9 +103,9 @@ public class ReindexingMaintainer extends AbstractComponent { executor.awaitTermination(5, TimeUnit.SECONDS); // Give it 5s to complete gracefully. - curator.close(); // Close the underlying curator independently to force shutdown + curator.close(); // Close the underlying curator independently to force shutdown. - if ( !executor.isShutdown() && ! executor.awaitTermination(5, TimeUnit.SECONDS)) + if ( ! executor.isShutdown() && ! executor.awaitTermination(5, TimeUnit.SECONDS)) log.log(WARNING, "Failed to shut down reindexing within timeout"); } catch (InterruptedException e) { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java index 8d9d21999d5..7816686221b 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java @@ -65,6 +65,11 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { return ErrorResponse.notFoundError("Nothing at " + request.getUri().getRawPath()); } + @Override + public void destroy() { + database.close(); + } + HttpResponse getRoot() { Slime slime = new Slime(); slime.setObject().setArray("resources").addObject().setString("url", "/reindexing/v1/status"); @@ -88,7 +93,6 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler { return new SlimeJsonResponse(slime); } - private static String toString(Reindexing.State state) { switch (state) { case READY: return "pending"; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index dec5cbb7ee6..75ee9d66b4a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -212,54 +212,54 @@ public class ModelContextImpl implements ModelContext { private final boolean mergeGroupingResultInSearchInvoker; private final boolean experimentalSdParsing; - public FeatureFlags(FlagSource source, ApplicationId appId) { - this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); - this.useThreePhaseUpdates = flagValue(source, appId, Flags.USE_THREE_PHASE_UPDATES); - this.feedSequencer = flagValue(source, appId, Flags.FEED_SEQUENCER_TYPE); - this.feedTaskLimit = flagValue(source, appId, Flags.FEED_TASK_LIMIT); - this.feedMasterTaskLimit = flagValue(source, appId, Flags.FEED_MASTER_TASK_LIMIT); - this.sharedFieldWriterExecutor = flagValue(source, appId, Flags.SHARED_FIELD_WRITER_EXECUTOR); - this.responseSequencer = flagValue(source, appId, Flags.RESPONSE_SEQUENCER_TYPE); - this.numResponseThreads = flagValue(source, appId, Flags.RESPONSE_NUM_THREADS); - this.skipCommunicationManagerThread = flagValue(source, appId, Flags.SKIP_COMMUNICATIONMANAGER_THREAD); - this.skipMbusRequestThread = flagValue(source, appId, Flags.SKIP_MBUS_REQUEST_THREAD); - this.skipMbusReplyThread = flagValue(source, appId, Flags.SKIP_MBUS_REPLY_THREAD); - this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); - this.feedConcurrency = flagValue(source, appId, Flags.FEED_CONCURRENCY); - this.allowedAthenzProxyIdentities = flagValue(source, appId, Flags.ALLOWED_ATHENZ_PROXY_IDENTITIES); - this.maxActivationInhibitedOutOfSyncGroups = flagValue(source, appId, Flags.MAX_ACTIVATION_INHIBITED_OUT_OF_SYNC_GROUPS); - this.jvmOmitStackTraceInFastThrow = type -> flagValueAsInt(source, appId, type, PermanentFlags.JVM_OMIT_STACK_TRACE_IN_FAST_THROW); - this.maxConcurrentMergesPerContentNode = flagValue(source, appId, Flags.MAX_CONCURRENT_MERGES_PER_NODE); - this.maxMergeQueueSize = flagValue(source, appId, Flags.MAX_MERGE_QUEUE_SIZE); - this.resourceLimitDisk = flagValue(source, appId, PermanentFlags.RESOURCE_LIMIT_DISK); - this.resourceLimitMemory = flagValue(source, appId, PermanentFlags.RESOURCE_LIMIT_MEMORY); - this.minNodeRatioPerGroup = flagValue(source, appId, Flags.MIN_NODE_RATIO_PER_GROUP); - this.metricsproxyNumThreads = flagValue(source, appId, Flags.METRICSPROXY_NUM_THREADS); - this.availableProcessors = flagValue(source, appId, Flags.AVAILABLE_PROCESSORS); - this.containerDumpHeapOnShutdownTimeout = flagValue(source, appId, Flags.CONTAINER_DUMP_HEAP_ON_SHUTDOWN_TIMEOUT); - this.containerShutdownTimeout = flagValue(source, appId,Flags.CONTAINER_SHUTDOWN_TIMEOUT); - this.maxUnCommittedMemory = flagValue(source, appId, Flags.MAX_UNCOMMITTED_MEMORY); - this.forwardIssuesAsErrors = flagValue(source, appId, PermanentFlags.FORWARD_ISSUES_AS_ERRORS); - this.ignoreThreadStackSizes = flagValue(source, appId, Flags.IGNORE_THREAD_STACK_SIZES); - this.unorderedMergeChaining = flagValue(source, appId, Flags.UNORDERED_MERGE_CHAINING); - this.useV8GeoPositions = flagValue(source, appId, Flags.USE_V8_GEO_POSITIONS); - this.useV8DocManagerCfg = flagValue(source, appId, Flags.USE_V8_DOC_MANAGER_CFG); - this.maxCompactBuffers = flagValue(source, appId, Flags.MAX_COMPACT_BUFFERS); - this.failDeploymentWithInvalidJvmOptions = flagValue(source, appId, Flags.FAIL_DEPLOYMENT_WITH_INVALID_JVM_OPTIONS); - this.ignoredHttpUserAgents = flagValue(source, appId, PermanentFlags.IGNORED_HTTP_USER_AGENTS); - this.enableServerOcspStapling = flagValue(source, appId, Flags.ENABLE_SERVER_OCSP_STAPLING); - this.persistenceAsyncThrottling = flagValue(source, appId, Flags.PERSISTENCE_ASYNC_THROTTLING); - this.mergeThrottlingPolicy = flagValue(source, appId, Flags.MERGE_THROTTLING_POLICY); - this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR); - this.persistenceThrottlingWsBackoff = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF); - this.persistenceThrottlingWindowSize = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WINDOW_SIZE); - this.persistenceThrottlingWsResizeRate = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_WS_RESIZE_RATE); - this.persistenceThrottlingOfMergeFeedOps = flagValue(source, appId, Flags.PERSISTENCE_THROTTLING_OF_MERGE_FEED_OPS); - this.inhibitDefaultMergesWhenGlobalMergesPending = flagValue(source, appId, Flags.INHIBIT_DEFAULT_MERGES_WHEN_GLOBAL_MERGES_PENDING); - this.useQrserverServiceName = flagValue(source, appId, Flags.USE_QRSERVER_SERVICE_NAME); - this.avoidRenamingSummaryFeatures = flagValue(source, appId, Flags.AVOID_RENAMING_SUMMARY_FEATURES); - this.mergeGroupingResultInSearchInvoker = flagValue(source, appId, Flags.MERGE_GROUPING_RESULT_IN_SEARCH_INVOKER); - this.experimentalSdParsing = flagValue(source, appId, Flags.EXPERIMENTAL_SD_PARSING); + public FeatureFlags(FlagSource source, ApplicationId appId, Version version) { + this.defaultTermwiseLimit = flagValue(source, appId, version, Flags.DEFAULT_TERM_WISE_LIMIT); + this.useThreePhaseUpdates = flagValue(source, appId, version, Flags.USE_THREE_PHASE_UPDATES); + this.feedSequencer = flagValue(source, appId, version, Flags.FEED_SEQUENCER_TYPE); + this.feedTaskLimit = flagValue(source, appId, version, Flags.FEED_TASK_LIMIT); + this.feedMasterTaskLimit = flagValue(source, appId, version, Flags.FEED_MASTER_TASK_LIMIT); + this.sharedFieldWriterExecutor = flagValue(source, appId, version, Flags.SHARED_FIELD_WRITER_EXECUTOR); + this.responseSequencer = flagValue(source, appId, version, Flags.RESPONSE_SEQUENCER_TYPE); + this.numResponseThreads = flagValue(source, appId, version, Flags.RESPONSE_NUM_THREADS); + this.skipCommunicationManagerThread = flagValue(source, appId, version, Flags.SKIP_COMMUNICATIONMANAGER_THREAD); + this.skipMbusRequestThread = flagValue(source, appId, version, Flags.SKIP_MBUS_REQUEST_THREAD); + this.skipMbusReplyThread = flagValue(source, appId, version, Flags.SKIP_MBUS_REPLY_THREAD); + this.useAsyncMessageHandlingOnSchedule = flagValue(source, appId, version, Flags.USE_ASYNC_MESSAGE_HANDLING_ON_SCHEDULE); + this.feedConcurrency = flagValue(source, appId, version, Flags.FEED_CONCURRENCY); + this.allowedAthenzProxyIdentities = flagValue(source, appId, version, Flags.ALLOWED_ATHENZ_PROXY_IDENTITIES); + this.maxActivationInhibitedOutOfSyncGroups = flagValue(source, appId, version, Flags.MAX_ACTIVATION_INHIBITED_OUT_OF_SYNC_GROUPS); + this.jvmOmitStackTraceInFastThrow = type -> flagValueAsInt(source, appId, version, type, PermanentFlags.JVM_OMIT_STACK_TRACE_IN_FAST_THROW); + this.maxConcurrentMergesPerContentNode = flagValue(source, appId, version, Flags.MAX_CONCURRENT_MERGES_PER_NODE); + this.maxMergeQueueSize = flagValue(source, appId, version, Flags.MAX_MERGE_QUEUE_SIZE); + this.resourceLimitDisk = flagValue(source, appId, version, PermanentFlags.RESOURCE_LIMIT_DISK); + this.resourceLimitMemory = flagValue(source, appId, version, PermanentFlags.RESOURCE_LIMIT_MEMORY); + this.minNodeRatioPerGroup = flagValue(source, appId, version, Flags.MIN_NODE_RATIO_PER_GROUP); + this.metricsproxyNumThreads = flagValue(source, appId, version, Flags.METRICSPROXY_NUM_THREADS); + this.availableProcessors = flagValue(source, appId, version, Flags.AVAILABLE_PROCESSORS); + this.containerDumpHeapOnShutdownTimeout = flagValue(source, appId, version, Flags.CONTAINER_DUMP_HEAP_ON_SHUTDOWN_TIMEOUT); + this.containerShutdownTimeout = flagValue(source, appId, version, Flags.CONTAINER_SHUTDOWN_TIMEOUT); + this.maxUnCommittedMemory = flagValue(source, appId, version, Flags.MAX_UNCOMMITTED_MEMORY); + this.forwardIssuesAsErrors = flagValue(source, appId, version, PermanentFlags.FORWARD_ISSUES_AS_ERRORS); + this.ignoreThreadStackSizes = flagValue(source, appId, version, Flags.IGNORE_THREAD_STACK_SIZES); + this.unorderedMergeChaining = flagValue(source, appId, version, Flags.UNORDERED_MERGE_CHAINING); + this.useV8GeoPositions = flagValue(source, appId, version, Flags.USE_V8_GEO_POSITIONS); + this.useV8DocManagerCfg = flagValue(source, appId, version, Flags.USE_V8_DOC_MANAGER_CFG); + this.maxCompactBuffers = flagValue(source, appId, version, Flags.MAX_COMPACT_BUFFERS); + this.failDeploymentWithInvalidJvmOptions = flagValue(source, appId, version, Flags.FAIL_DEPLOYMENT_WITH_INVALID_JVM_OPTIONS); + this.ignoredHttpUserAgents = flagValue(source, appId, version, PermanentFlags.IGNORED_HTTP_USER_AGENTS); + this.enableServerOcspStapling = flagValue(source, appId, version, Flags.ENABLE_SERVER_OCSP_STAPLING); + this.persistenceAsyncThrottling = flagValue(source, appId, version, Flags.PERSISTENCE_ASYNC_THROTTLING); + this.mergeThrottlingPolicy = flagValue(source, appId, version, Flags.MERGE_THROTTLING_POLICY); + this.persistenceThrottlingWsDecrementFactor = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_DECREMENT_FACTOR); + this.persistenceThrottlingWsBackoff = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_BACKOFF); + this.persistenceThrottlingWindowSize = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WINDOW_SIZE); + this.persistenceThrottlingWsResizeRate = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_WS_RESIZE_RATE); + this.persistenceThrottlingOfMergeFeedOps = flagValue(source, appId, version, Flags.PERSISTENCE_THROTTLING_OF_MERGE_FEED_OPS); + this.inhibitDefaultMergesWhenGlobalMergesPending = flagValue(source, appId, version, Flags.INHIBIT_DEFAULT_MERGES_WHEN_GLOBAL_MERGES_PENDING); + this.useQrserverServiceName = flagValue(source, appId, version, Flags.USE_QRSERVER_SERVICE_NAME); + this.avoidRenamingSummaryFeatures = flagValue(source, appId, version, Flags.AVOID_RENAMING_SUMMARY_FEATURES); + this.mergeGroupingResultInSearchInvoker = flagValue(source, appId, version, Flags.MERGE_GROUPING_RESULT_IN_SEARCH_INVOKER); + this.experimentalSdParsing = flagValue(source, appId, version, Flags.EXPERIMENTAL_SD_PARSING); } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @@ -312,33 +312,38 @@ public class ModelContextImpl implements ModelContext { @Override public boolean mergeGroupingResultInSearchInvoker() { return mergeGroupingResultInSearchInvoker; } @Override public boolean experimentalSdParsing() { return experimentalSdParsing; } - private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) { + private static <V> V flagValue(FlagSource source, ApplicationId appId, Version vespaVersion, UnboundFlag<? extends V, ?, ?> flag) { return flag.bindTo(source) .with(FetchVector.Dimension.APPLICATION_ID, appId.serializedForm()) + .with(FetchVector.Dimension.VESPA_VERSION, vespaVersion.toFullString()) .boxedValue(); } - private static <V> V flagValue(FlagSource source, TenantName tenant, UnboundFlag<? extends V, ?, ?> flag) { + private static <V> V flagValue(FlagSource source, TenantName tenant, Version vespaVersion, UnboundFlag<? extends V, ?, ?> flag) { return flag.bindTo(source) .with(FetchVector.Dimension.TENANT_ID, tenant.value()) + .with(FetchVector.Dimension.VESPA_VERSION, vespaVersion.toFullString()) .boxedValue(); } private static <V> V flagValue(FlagSource source, ApplicationId appId, + Version vespaVersion, ClusterSpec.Type clusterType, UnboundFlag<? extends V, ?, ?> flag) { return flag.bindTo(source) .with(FetchVector.Dimension.APPLICATION_ID, appId.serializedForm()) .with(FetchVector.Dimension.CLUSTER_TYPE, clusterType.name()) + .with(FetchVector.Dimension.VESPA_VERSION, vespaVersion.toFullString()) .boxedValue(); } static int flagValueAsInt(FlagSource source, ApplicationId appId, + Version version, ClusterSpec.Type clusterType, UnboundFlag<? extends Boolean, ?, ?> flag) { - return flagValue(source, appId, clusterType, flag) ? 1 : 0; + return flagValue(source, appId, version, clusterType, flag) ? 1 : 0; } private String translateJvmOmitStackTraceInFastThrowIntToString(ToIntFunction<ClusterSpec.Type> function, @@ -375,6 +380,7 @@ public class ModelContextImpl implements ModelContext { private final List<String> environmentVariables; public Properties(ApplicationId applicationId, + Version nodeVespaVersion, ConfigserverConfig configserverConfig, Zone zone, Set<ContainerEndpoint> endpoints, @@ -387,7 +393,7 @@ public class ModelContextImpl implements ModelContext { List<TenantSecretStore> tenantSecretStores, SecretStore secretStore, List<X509Certificate> operatorCertificates) { - this.featureFlags = new FeatureFlags(flagSource, applicationId); + this.featureFlags = new FeatureFlags(flagSource, applicationId, nodeVespaVersion); this.applicationId = applicationId; this.multitenant = configserverConfig.multitenant() || configserverConfig.hostedVespa() || Boolean.getBoolean("multitenant"); this.configServerSpecs = fromConfig(configserverConfig); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java b/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java index 54ceb394ee6..f2d9eb835be 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/modelfactory/ActivatedModelsBuilder.java @@ -102,7 +102,7 @@ public class ActivatedModelsBuilder extends ModelsBuilder<Application> { ) { log.log(Level.FINE, () -> String.format("Loading model version %s for session %s application %s", modelFactory.version(), applicationGeneration, applicationId)); - ModelContext.Properties modelContextProperties = createModelContextProperties(applicationId, applicationPackage); + ModelContext.Properties modelContextProperties = createModelContextProperties(applicationId, wantedNodeVespaVersion, applicationPackage); Provisioned provisioned = new Provisioned(); ModelContext modelContext = new ModelContextImpl( applicationPackage, @@ -146,8 +146,9 @@ public class ActivatedModelsBuilder extends ModelsBuilder<Application> { return Optional.of(value); } - private ModelContext.Properties createModelContextProperties(ApplicationId applicationId, ApplicationPackage applicationPackage) { + private ModelContext.Properties createModelContextProperties(ApplicationId applicationId, Version wantedNodeVespaVersion, ApplicationPackage applicationPackage) { return new ModelContextImpl.Properties(applicationId, + wantedNodeVespaVersion, configserverConfig, zone(), ImmutableSet.copyOf(new ContainerEndpointsCache(TenantRepository.getTenantPath(tenant), curator).read(applicationId)), diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java index f8ee3e5e0c9..8293871335d 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionPreparer.java @@ -191,6 +191,7 @@ public class SessionPreparer { this.containerEndpoints = readEndpointsIfNull(params.containerEndpoints()); this.athenzDomain = params.athenzDomain(); this.properties = new ModelContextImpl.Properties(params.getApplicationId(), + vespaVersion, configserverConfig, zone, Set.copyOf(containerEndpoints), diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java index 659bc771ac7..41e6fe98441 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ModelContextImplTest.java @@ -63,6 +63,7 @@ public class ModelContextImplTest { new Provisioned(), new ModelContextImpl.Properties( ApplicationId.defaultId(), + Version.emptyVersion, configserverConfig, Zone.defaultZone(), endpoints, diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java b/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java index f85f626d4b7..fa749d924d1 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/application/DeactivatedContainer.java @@ -29,7 +29,7 @@ public interface DeactivatedContainer { * DeactivatedContainer is considered to have terminated once there are no more {@link Request}s, {@link Response}s * or corresponding {@link ContentChannel}s being processed by components that belong to it.</p> * - * <p>If termination has already occured, this method immediately runs the given Runnable in the current thread.</p> + * <p>If termination has already occurred, this method immediately runs the given Runnable in the current thread.</p> * * @param task The task to run once this DeactivatedContainer has terminated. */ diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 20aba93d4d7..2e53db3bd87 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -490,7 +490,7 @@ IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) args->old_absolute_id = _current_index_id + _last_fusion_id; args->old_source_list = _source_list; string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id)); - args->flush_serial_num = _current_serial_num; + args->flush_serial_num = current_serial_num(); { LockGuard lock(_index_update_lock); // Handover of extra memory indexes to flush @@ -795,7 +795,7 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) args._oldSourceList = _source_list; // Delay destruction uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id; string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId)); - SerialNum freezeSerialNum = _current_serial_num; + SerialNum freezeSerialNum = current_serial_num(); bool dropEmptyLast = false; SaveInfo::UP saveInfo; @@ -921,7 +921,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _flush_serial_num = IndexReadUtilities::readSerialNum(latest_index_dir); _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir); - _current_serial_num = _flush_serial_num; + set_current_serial_num(_flush_serial_num); const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir); _selector = FixedSourceSelector::load(selector, _next_id - 1); } else { @@ -941,7 +941,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); _selector->setDefaultSource(_current_index_id); auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector)); - _current_index = operations.createMemoryIndex(_schema, *sourceList, _current_serial_num); + _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num()); LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num); sourceList->append(_current_index_id, _current_index); sourceList->setCurrentIndex(_current_index_id); @@ -966,10 +966,10 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits { LockGuard lock(_index_update_lock); - _current_serial_num = std::max(_current_serial_num, serialNum); + set_current_serial_num(std::max(current_serial_num(), serialNum)); } - IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, _current_serial_num)); + IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num())); FlushArgs args; args.stats = stats; // Ensure that all index thread tasks accessing memory index have completed. @@ -984,7 +984,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat if (args._skippedEmptyLast && args._extraIndexes.empty()) { // No memory index to flush, it was empty LockGuard lock(_state_lock); - _flush_serial_num = _current_serial_num; + _flush_serial_num = current_serial_num(); _lastFlushTime = vespalib::system_clock::now(); LOG(debug, "No memory index to flush. Update serial number and flush time to current: " "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)", @@ -1014,7 +1014,7 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok // XXX: Claims to have flushed memory index when starting fusion. { LockGuard lock(_index_update_lock); - _current_serial_num = std::max(_current_serial_num, serialNum); + set_current_serial_num(std::max(current_serial_num(), serialNum)); } FusionSpec spec; @@ -1234,7 +1234,7 @@ IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serial _selector->setSource(lid, _current_index_id); _source_list->setSource(lid); ++_source_selector_changes; - _current_serial_num = serialNum; + set_current_serial_num(serialNum); } void @@ -1247,7 +1247,7 @@ IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum) _source_list->setSource(lid); } _source_selector_changes += lids.size(); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); _current_index->removeDocuments(std::move(lids)); } @@ -1267,7 +1267,7 @@ IndexMaintainer::commit(vespalib::Gate& gate) // only triggered via commit_and_wait() assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), _current_serial_num); + _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), current_serial_num()); } void @@ -1275,7 +1275,7 @@ IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone) { assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); _current_index->commit(onWriteDone, serialNum); } @@ -1284,7 +1284,7 @@ IndexMaintainer::heartBeat(SerialNum serialNum) { assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); } void @@ -1293,7 +1293,7 @@ IndexMaintainer::compactLidSpace(uint32_t lidLimit, SerialNum serialNum) assert(_ctx.getThreadingService().index().isCurrentThread()); LOG(info, "compactLidSpace(%u, %" PRIu64 ")", lidLimit, serialNum); LockGuard lock(_index_update_lock); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); _selector->compactLidSpace(lidLimit); } @@ -1313,7 +1313,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) { assert(_ctx.getThreadingService().master().isCurrentThread()); pruneRemovedFields(schema, serialNum); - IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, _current_serial_num)); + IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num())); SetSchemaArgs args; args._newSchema = schema; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index b4bba209937..dfae5b4d643 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -18,6 +18,7 @@ #include <vespa/searchcorespi/flush/flushstats.h> #include <vespa/searchlib/attribute/fixedsourceselector.h> #include <vespa/searchlib/common/serialnum.h> +#include <atomic> namespace document { class Document; } @@ -87,14 +88,14 @@ class IndexMaintainer : public IIndexManager, // _selector is protected by SL + IUL ISourceSelector::SP _selector; ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread - uint32_t _last_fusion_id; // Protected by SL + IUL - uint32_t _next_id; // Protected by SL + IUL - uint32_t _current_index_id; // Protected by SL + IUL - IMemoryIndex::SP _current_index; // Protected by SL + IUL - bool _flush_empty_current_index; - SerialNum _current_serial_num;// Protected by IUL - SerialNum _flush_serial_num; // Protected by SL - vespalib::system_time _lastFlushTime; // Protected by SL + uint32_t _last_fusion_id; // Protected by SL + IUL + uint32_t _next_id; // Protected by SL + IUL + uint32_t _current_index_id; // Protected by SL + IUL + IMemoryIndex::SP _current_index; // Protected by SL + IUL + bool _flush_empty_current_index; + std::atomic<SerialNum> _current_serial_num;// Protected by IUL + SerialNum _flush_serial_num; // Protected by SL + vespalib::system_time _lastFlushTime; // Protected by SL // Extra frozen memory indexes. This list is empty unless new // memory index has been added by force (due to config change or // data structure limitations). @@ -263,6 +264,12 @@ class IndexMaintainer : public IIndexManager, void commit_and_wait(); void commit(vespalib::Gate& gate); void pruneRemovedFields(const Schema &schema, SerialNum serialNum); + [[nodiscard]] SerialNum current_serial_num() const noexcept { + return _current_serial_num.load(std::memory_order_relaxed); + } + void set_current_serial_num(SerialNum new_serial_num) noexcept { + _current_serial_num.store(new_serial_num, std::memory_order_relaxed); + } public: IndexMaintainer(const IndexMaintainer &) = delete; @@ -270,7 +277,7 @@ public: IndexMaintainer(const IndexMaintainerConfig &config, const IndexMaintainerContext &context, IIndexMaintainerOperations &operations); - ~IndexMaintainer(); + ~IndexMaintainer() override; /** * Starts a new MemoryIndex, and dumps the previous one to disk. @@ -333,7 +340,7 @@ public: void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) override; SerialNum getCurrentSerialNum() const override { - return _current_serial_num; + return _current_serial_num.load(std::memory_order_relaxed); } SerialNum getFlushedSerialNum() const override { diff --git a/vespalib/src/tests/portal/reactor/reactor_test.cpp b/vespalib/src/tests/portal/reactor/reactor_test.cpp index 82d4d8d3794..d7d61f9ee79 100644 --- a/vespalib/src/tests/portal/reactor/reactor_test.cpp +++ b/vespalib/src/tests/portal/reactor/reactor_test.cpp @@ -42,15 +42,13 @@ void wait_tick() { } } -struct SimpleHandler : Reactor::EventHandler { +struct HandlerBase : Reactor::EventHandler { SocketPair sockets; std::atomic<size_t> read_cnt; std::atomic<size_t> write_cnt; - Reactor::Token::UP token; - SimpleHandler(Reactor &reactor, bool read, bool write) - : sockets(), read_cnt(0), write_cnt(0), token() + HandlerBase() + : sockets(), read_cnt(0), write_cnt(0) { - token = reactor.attach(*this, sockets.main.get(), read, write); } void handle_event(bool read, bool write) override { if (read) { @@ -68,16 +66,33 @@ struct SimpleHandler : Reactor::EventHandler { EXPECT_EQUAL((read_sample != read_cnt), read); EXPECT_EQUAL((write_sample != write_cnt), write); } + ~HandlerBase(); +}; +HandlerBase::~HandlerBase() = default; + +struct SimpleHandler : HandlerBase { + Reactor::Token::UP token; + SimpleHandler(Reactor &reactor, bool read, bool write) + : HandlerBase(), token() + { + token = reactor.attach(*this, sockets.main.get(), read, write); + } ~SimpleHandler(); }; SimpleHandler::~SimpleHandler() = default; -struct DeletingHandler : SimpleHandler { +struct DeletingHandler : HandlerBase { + Gate allow_delete; Gate token_deleted; - DeletingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true), - token_deleted() {} + Reactor::Token::UP token; + DeletingHandler(Reactor &reactor) + : HandlerBase(), allow_delete(), token_deleted(), token() + { + token = reactor.attach(*this, sockets.main.get(), true, true); + } void handle_event(bool read, bool write) override { - SimpleHandler::handle_event(read, write); + HandlerBase::handle_event(read, write); + allow_delete.await(); token.reset(); token_deleted.countDown(); } @@ -85,14 +100,18 @@ struct DeletingHandler : SimpleHandler { }; DeletingHandler::~DeletingHandler() = default; -struct WaitingHandler : SimpleHandler { +struct WaitingHandler : HandlerBase { Gate enter_callback; Gate exit_callback; - WaitingHandler(Reactor &reactor) : SimpleHandler(reactor, true, true), - enter_callback(), exit_callback() {} + Reactor::Token::UP token; + WaitingHandler(Reactor &reactor) + : HandlerBase(), enter_callback(), exit_callback(), token() + { + token = reactor.attach(*this, sockets.main.get(), true, true); + } void handle_event(bool read, bool write) override { enter_callback.countDown(); - SimpleHandler::handle_event(read, write); + HandlerBase::handle_event(read, write); exit_callback.await(); } ~WaitingHandler(); @@ -135,6 +154,7 @@ TEST_FF("require that deleting reactor token disables io events", Reactor(tick), TEST_FF("require that reactor token can be destroyed during io event handling", Reactor(tick), TimeBomb(60)) { DeletingHandler handler(f1); + handler.allow_delete.countDown(); handler.token_deleted.await(); TEST_DO(handler.verify(false, false)); EXPECT_EQUAL(handler.read_cnt, 1u); diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index d226cfcd43e..2b5395773ee 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.curator; import com.google.inject.Inject; import com.yahoo.cloud.config.CuratorConfig; +import com.yahoo.component.AbstractComponent; import com.yahoo.path.Path; import com.yahoo.vespa.curator.api.VespaCurator; import com.yahoo.vespa.curator.recipes.CuratorCounter; @@ -49,7 +50,7 @@ import java.util.logging.Logger; * @author vegardh * @author bratseth */ -public class Curator implements VespaCurator, AutoCloseable { +public class Curator extends AbstractComponent implements VespaCurator, AutoCloseable { private static final Logger LOG = Logger.getLogger(Curator.class.getName()); private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg")); @@ -82,7 +83,6 @@ public class Curator implements VespaCurator, AutoCloseable { } @Inject - // TODO jonmv: Use a Provider for this, due to required shutdown. public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { // Depends on ZooKeeperServer to make sure it is started first this(ConnectionSpec.create(curatorConfig.server(), @@ -316,6 +316,11 @@ public class Curator implements VespaCurator, AutoCloseable { curatorFramework.close(); } + @Override + public void deconstruct() { + close(); + } + /** * Interface for waiting for completion of an operation */ diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index 604419c063d..389df340ca7 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -29,8 +29,7 @@ public class Reconfigurer extends AbstractComponent { private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName()); - private static final Duration MIN_TIMEOUT = Duration.ofMinutes(3); - private static final Duration NODE_TIMEOUT = Duration.ofMinutes(1); + private static final Duration TIMEOUT = Duration.ofMinutes(3); private final ExponentialBackoff backoff = new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10)); private final VespaZooKeeperAdmin vespaZooKeeperAdmin; @@ -95,7 +94,7 @@ public class Reconfigurer extends AbstractComponent { "\nServers in new config:" + servers(newConfig)); String connectionSpec = localConnectionSpec(activeConfig); Instant now = Instant.now(); - Duration reconfigTimeout = reconfigTimeout(newConfig.server().size()); + Duration reconfigTimeout = reconfigTimeout(); Instant end = now.plus(reconfigTimeout); // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed for (int attempt = 1; now.isBefore(end); attempt++) { @@ -129,11 +128,10 @@ public class Reconfigurer extends AbstractComponent { Process.logAndDie("Reconfiguration did not complete within timeout " + reconfigTimeout + ". Forcing container shutdown."); } - /** Returns the timeout to use for the given joining server count */ - private static Duration reconfigTimeout(int joiningServers) { + private static Duration reconfigTimeout() { // For reconfig to succeed, the current and resulting ensembles must have a majority. When an ensemble grows and // the joining servers outnumber the existing ones, we have to wait for enough of them to start to have a majority. - return Duration.ofMillis(Math.max(joiningServers * NODE_TIMEOUT.toMillis(), MIN_TIMEOUT.toMillis())); + return TIMEOUT; } private static String localConnectionSpec(ZookeeperServerConfig config) { |