diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-01-07 10:42:15 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-01-07 11:49:22 +0100 |
commit | 132a570ef9da7822198396079b8c0c9e27c81755 (patch) | |
tree | ceeaf8f24019ceeff31a6d7b88c1b528aa35eb61 /configserver | |
parent | 47cb2ffe916c282fb27f622ebca938952727c7b1 (diff) |
If generation has changed, you might have been left behind.
If so run a second train that will pick up any remaining passengers.
Diffstat (limited to 'configserver')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java | 33 | ||||
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java | 37 |
2 files changed, 55 insertions, 15 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java index 7fa60860b44..db863064834 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.config.server.rpc; import com.yahoo.cloud.config.SentinelConfig; +import com.yahoo.collections.Pair; import com.yahoo.config.provision.TenantName; import com.yahoo.component.Version; import com.yahoo.jrt.Request; @@ -66,13 +67,13 @@ class GetConfigProcessor implements Runnable { } // TODO: Increment statistics (Metrics) failed counters when requests fail - public void run() { + public Pair<GetConfigContext, Long> getConfig(JRTServerConfigRequest request) { //Request has already been detached if ( ! request.validateParameters()) { // Error code is set in verifyParameters if parameters are not OK. log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); respond(request); - return; + return null; } Trace trace = request.getRequestTrace(); if (logDebug(trace)) { @@ -85,13 +86,13 @@ class GetConfigProcessor implements Runnable { // fabricate an empty request to cause the sentinel to stop all running services if (rpcServer.isHostedVespa() && rpcServer.allTenantsLoaded() && !tenant.isPresent() && isSentinelConfigRequest(request)) { returnEmpty(request); - return; + return null; } GetConfigContext context = rpcServer.createGetConfigContext(tenant, request, trace); if (context == null || ! context.requestHandler().hasApplication(context.applicationId(), Optional.empty())) { handleError(request, ErrorCode.APPLICATION_NOT_LOADED, "No application exists"); - return; + return null; } Optional<Version> vespaVersion = rpcServer.useRequestVersion() ? @@ -103,7 +104,7 @@ class GetConfigProcessor implements Runnable { if ( ! context.requestHandler().hasApplication(context.applicationId(), vespaVersion)) { handleError(request, ErrorCode.UNKNOWN_VESPA_VERSION, "Unknown Vespa version in request: " + getPrintableVespaVersion(vespaVersion)); - return; + return null; } this.logPre = TenantRepository.logPre(context.applicationId()); @@ -112,14 +113,14 @@ class GetConfigProcessor implements Runnable { config = rpcServer.resolveConfig(request, context, vespaVersion); } catch (UnknownConfigDefinitionException e) { handleError(request, ErrorCode.UNKNOWN_DEFINITION, "Unknown config definition " + request.getConfigKey()); - return; + return null; } catch (UnknownConfigIdException e) { handleError(request, ErrorCode.ILLEGAL_CONFIGID, "Illegal config id " + request.getConfigKey().getConfigId()); - return; + return null; } catch (Throwable e) { log.log(Level.SEVERE, "Unexpected error handling config request", e); handleError(request, ErrorCode.INTERNAL_ERROR, "Internal error " + e.getMessage()); - return; + return null; } // config == null is not an error, but indicates that the config will be returned later. @@ -134,7 +135,21 @@ class GetConfigProcessor implements Runnable { if (logDebug(trace)) { debugLog(trace, "delaying response " + request.getShortDescription()); } - rpcServer.delayResponse(request, context); + return new Pair<>(context, config != null ? config.getGeneration() : 0); + } + return null; + } + @Override + public void run() { + Pair<GetConfigContext, Long> delayed = getConfig(request); + + if (delayed != null) { + rpcServer.delayResponse(request, delayed.getFirst()); + if (rpcServer.hasNewerGeneration(delayed.getFirst().applicationId(), delayed.getSecond())) { + // This will ensure that if the reload train left the station while I was boarding, another train will + // immediately be scheduled. + rpcServer.configReloaded(delayed.getFirst().applicationId()); + } } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index a7d603fe53f..c16303fbc88 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -53,7 +53,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; @@ -61,6 +60,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import java.util.stream.Stream; @@ -91,6 +91,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private final HostRegistry<TenantName> hostRegistry; private final Map<TenantName, TenantHandlerProvider> tenantProviders = new ConcurrentHashMap<>(); + private final Map<ApplicationId, ApplicationState> applicationStateMap = new ConcurrentHashMap<>(); private final SuperModelRequestHandler superModelRequestHandler; private final MetricUpdater metrics; private final MetricUpdaterFactory metricUpdaterFactory; @@ -102,6 +103,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private volatile boolean allTenantsLoaded = false; private boolean isRunning = false; + class ApplicationState { + private final AtomicLong activeGeneration = new AtomicLong(0); + ApplicationState(long generation) { + activeGeneration.set(generation); + } + long getActiveGeneration() { return activeGeneration.get(); } + void setActiveGeneration(long generation) { activeGeneration.set(generation); } + } /** * Creates an RpcServer listening on the specified <code>port</code>. * @@ -215,6 +224,17 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { .returnDesc(0, "ret", "0 if success, 1 otherwise")); } + private ApplicationState getState(ApplicationId id) { + ApplicationState state = applicationStateMap.get(id); + if (state == null) { + applicationStateMap.putIfAbsent(id, new ApplicationState(0)); + state = applicationStateMap.get(id); + } + return state; + } + boolean hasNewerGeneration(ApplicationId id, long generation) { + return getState(id).getActiveGeneration() > generation; + } /** * Checks all delayed responses for config changes and waits until all has been answered. * This method should be called when config is reloaded in the server. @@ -222,16 +242,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void configActivated(ApplicationSet applicationSet) { ApplicationId applicationId = applicationSet.getId(); - configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId)); + ApplicationState state = getState(applicationId); + state.setActiveGeneration(applicationSet.getApplicationGeneration()); + configReloaded(applicationId); reloadSuperModel(applicationSet); } private void reloadSuperModel(ApplicationSet applicationSet) { superModelRequestHandler.reloadConfig(applicationSet); - configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global())); + configReloaded(ApplicationId.global()); } - private void configReloaded(List<DelayedConfigResponses.DelayedConfigResponse> responses, String logPre) { + void configReloaded(ApplicationId applicationId) { + List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId); + String logPre = TenantRepository.logPre(applicationId); if (log.isLoggable(LogLevel.DEBUG)) { log.log(LogLevel.DEBUG, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue"); } @@ -286,8 +310,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void applicationRemoved(ApplicationId applicationId) { superModelRequestHandler.removeApplication(applicationId); - configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId)); - configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global())); + configReloaded(applicationId); + configReloaded(ApplicationId.global()); } public void respond(JRTServerConfigRequest request) { @@ -533,4 +557,5 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { new FileReferenceDownload(fileReference, false /* downloadFromOtherSourceIfNotFound */))); req.returnValues().add(new Int32Value(0)); } + } |