summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-01-07 10:42:15 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2019-01-07 11:49:22 +0100
commit132a570ef9da7822198396079b8c0c9e27c81755 (patch)
treeceeaf8f24019ceeff31a6d7b88c1b528aa35eb61 /configserver
parent47cb2ffe916c282fb27f622ebca938952727c7b1 (diff)
If generation has changed, you might have been left behind.
If so run a second train that will pick up any remaining passengers.
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java33
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java37
2 files changed, 55 insertions, 15 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java
index 7fa60860b44..db863064834 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.config.server.rpc;
import com.yahoo.cloud.config.SentinelConfig;
+import com.yahoo.collections.Pair;
import com.yahoo.config.provision.TenantName;
import com.yahoo.component.Version;
import com.yahoo.jrt.Request;
@@ -66,13 +67,13 @@ class GetConfigProcessor implements Runnable {
}
// TODO: Increment statistics (Metrics) failed counters when requests fail
- public void run() {
+ public Pair<GetConfigContext, Long> getConfig(JRTServerConfigRequest request) {
//Request has already been detached
if ( ! request.validateParameters()) {
// Error code is set in verifyParameters if parameters are not OK.
log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
respond(request);
- return;
+ return null;
}
Trace trace = request.getRequestTrace();
if (logDebug(trace)) {
@@ -85,13 +86,13 @@ class GetConfigProcessor implements Runnable {
// fabricate an empty request to cause the sentinel to stop all running services
if (rpcServer.isHostedVespa() && rpcServer.allTenantsLoaded() && !tenant.isPresent() && isSentinelConfigRequest(request)) {
returnEmpty(request);
- return;
+ return null;
}
GetConfigContext context = rpcServer.createGetConfigContext(tenant, request, trace);
if (context == null || ! context.requestHandler().hasApplication(context.applicationId(), Optional.empty())) {
handleError(request, ErrorCode.APPLICATION_NOT_LOADED, "No application exists");
- return;
+ return null;
}
Optional<Version> vespaVersion = rpcServer.useRequestVersion() ?
@@ -103,7 +104,7 @@ class GetConfigProcessor implements Runnable {
if ( ! context.requestHandler().hasApplication(context.applicationId(), vespaVersion)) {
handleError(request, ErrorCode.UNKNOWN_VESPA_VERSION, "Unknown Vespa version in request: " + getPrintableVespaVersion(vespaVersion));
- return;
+ return null;
}
this.logPre = TenantRepository.logPre(context.applicationId());
@@ -112,14 +113,14 @@ class GetConfigProcessor implements Runnable {
config = rpcServer.resolveConfig(request, context, vespaVersion);
} catch (UnknownConfigDefinitionException e) {
handleError(request, ErrorCode.UNKNOWN_DEFINITION, "Unknown config definition " + request.getConfigKey());
- return;
+ return null;
} catch (UnknownConfigIdException e) {
handleError(request, ErrorCode.ILLEGAL_CONFIGID, "Illegal config id " + request.getConfigKey().getConfigId());
- return;
+ return null;
} catch (Throwable e) {
log.log(Level.SEVERE, "Unexpected error handling config request", e);
handleError(request, ErrorCode.INTERNAL_ERROR, "Internal error " + e.getMessage());
- return;
+ return null;
}
// config == null is not an error, but indicates that the config will be returned later.
@@ -134,7 +135,21 @@ class GetConfigProcessor implements Runnable {
if (logDebug(trace)) {
debugLog(trace, "delaying response " + request.getShortDescription());
}
- rpcServer.delayResponse(request, context);
+ return new Pair<>(context, config != null ? config.getGeneration() : 0);
+ }
+ return null;
+ }
+ @Override
+ public void run() {
+ Pair<GetConfigContext, Long> delayed = getConfig(request);
+
+ if (delayed != null) {
+ rpcServer.delayResponse(request, delayed.getFirst());
+ if (rpcServer.hasNewerGeneration(delayed.getFirst().applicationId(), delayed.getSecond())) {
+ // This will ensure that if the reload train left the station while I was boarding, another train will
+ // immediately be scheduled.
+ rpcServer.configReloaded(delayed.getFirst().applicationId());
+ }
}
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index a7d603fe53f..c16303fbc88 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -53,7 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
@@ -61,6 +60,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.stream.Stream;
@@ -91,6 +91,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
private final HostRegistry<TenantName> hostRegistry;
private final Map<TenantName, TenantHandlerProvider> tenantProviders = new ConcurrentHashMap<>();
+ private final Map<ApplicationId, ApplicationState> applicationStateMap = new ConcurrentHashMap<>();
private final SuperModelRequestHandler superModelRequestHandler;
private final MetricUpdater metrics;
private final MetricUpdaterFactory metricUpdaterFactory;
@@ -102,6 +103,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
private volatile boolean allTenantsLoaded = false;
private boolean isRunning = false;
+ class ApplicationState {
+ private final AtomicLong activeGeneration = new AtomicLong(0);
+ ApplicationState(long generation) {
+ activeGeneration.set(generation);
+ }
+ long getActiveGeneration() { return activeGeneration.get(); }
+ void setActiveGeneration(long generation) { activeGeneration.set(generation); }
+ }
/**
* Creates an RpcServer listening on the specified <code>port</code>.
*
@@ -215,6 +224,17 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
+ private ApplicationState getState(ApplicationId id) {
+ ApplicationState state = applicationStateMap.get(id);
+ if (state == null) {
+ applicationStateMap.putIfAbsent(id, new ApplicationState(0));
+ state = applicationStateMap.get(id);
+ }
+ return state;
+ }
+ boolean hasNewerGeneration(ApplicationId id, long generation) {
+ return getState(id).getActiveGeneration() > generation;
+ }
/**
* Checks all delayed responses for config changes and waits until all has been answered.
* This method should be called when config is reloaded in the server.
@@ -222,16 +242,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@Override
public void configActivated(ApplicationSet applicationSet) {
ApplicationId applicationId = applicationSet.getId();
- configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId));
+ ApplicationState state = getState(applicationId);
+ state.setActiveGeneration(applicationSet.getApplicationGeneration());
+ configReloaded(applicationId);
reloadSuperModel(applicationSet);
}
private void reloadSuperModel(ApplicationSet applicationSet) {
superModelRequestHandler.reloadConfig(applicationSet);
- configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global()));
+ configReloaded(ApplicationId.global());
}
- private void configReloaded(List<DelayedConfigResponses.DelayedConfigResponse> responses, String logPre) {
+ void configReloaded(ApplicationId applicationId) {
+ List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId);
+ String logPre = TenantRepository.logPre(applicationId);
if (log.isLoggable(LogLevel.DEBUG)) {
log.log(LogLevel.DEBUG, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue");
}
@@ -286,8 +310,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@Override
public void applicationRemoved(ApplicationId applicationId) {
superModelRequestHandler.removeApplication(applicationId);
- configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId));
- configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global()));
+ configReloaded(applicationId);
+ configReloaded(ApplicationId.global());
}
public void respond(JRTServerConfigRequest request) {
@@ -533,4 +557,5 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
new FileReferenceDownload(fileReference, false /* downloadFromOtherSourceIfNotFound */)));
req.returnValues().add(new Int32Value(0));
}
+
}