summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2019-01-07 12:28:03 +0100
committerGitHub <noreply@github.com>2019-01-07 12:28:03 +0100
commit2f58f9a08d1c8dd7344aba9a8d0204f35b329d16 (patch)
tree2fb92e62269de68f1f6c58fc10e1dc50bda1e083 /configserver
parentd14466361287e45e4c12021d6c1497bb63ed4ce1 (diff)
parent132a570ef9da7822198396079b8c0c9e27c81755 (diff)
Merge pull request #8037 from vespa-engine/balder/ensure-request-is-not-forgotten
Balder/ensure request is not forgotten
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java23
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java33
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java72
3 files changed, 81 insertions, 47 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java
index 5fae6e23323..f58c6cfdee5 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java
@@ -45,7 +45,7 @@ public class DelayedConfigResponses {
private final Map<ApplicationId, BlockingQueue<DelayedConfigResponse>> delayedResponses =
new ConcurrentHashMap<>();
- public DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads) {
+ DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads) {
this(rpcServer, numTimerThreads, true);
}
@@ -70,19 +70,20 @@ public class DelayedConfigResponses {
* The run method of this class is run by a Timer when the timeout expires.
* The timer associated with this response must be cancelled first.
*/
- public class DelayedConfigResponse implements Runnable, TargetWatcher {
+ class DelayedConfigResponse implements Runnable, TargetWatcher {
final JRTServerConfigRequest request;
private final BlockingQueue<DelayedConfigResponse> delayedResponsesQueue;
private final ApplicationId app;
private ScheduledFuture<?> future;
- public DelayedConfigResponse(JRTServerConfigRequest req, BlockingQueue<DelayedConfigResponse> delayedResponsesQueue, ApplicationId app) {
+ DelayedConfigResponse(JRTServerConfigRequest req, BlockingQueue<DelayedConfigResponse> delayedResponsesQueue, ApplicationId app) {
this.request = req;
this.delayedResponsesQueue = delayedResponsesQueue;
this.app = app;
}
+ @Override
public synchronized void run() {
removeFromQueue();
removeWatcher();
@@ -99,18 +100,17 @@ public class DelayedConfigResponses {
delayedResponsesQueue.remove(this);
}
- public JRTServerConfigRequest getRequest() {
+ JRTServerConfigRequest getRequest() {
return request;
}
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Delayed response for ").append(logPre()).append(request.getShortDescription());
return sb.toString();
}
- public ApplicationId getAppId() { return app; }
-
String logPre() {
return TenantRepository.logPre(app);
}
@@ -128,7 +128,7 @@ public class DelayedConfigResponses {
return future.cancel(false);
}
- public synchronized void schedule(long delay) throws InterruptedException {
+ synchronized void schedule(long delay) throws InterruptedException {
delayedResponsesQueue.put(this);
future = executorService.schedule(this, delay, TimeUnit.MILLISECONDS);
addWatcher();
@@ -167,7 +167,7 @@ public class DelayedConfigResponses {
*
* @param request a JRTConfigRequest
*/
- public final void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
+ final void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
if (request.isDelayedResponse()) {
log.log(LogLevel.DEBUG, context.logPre()+"Request already delayed");
} else {
@@ -203,7 +203,7 @@ public class DelayedConfigResponses {
}
}
- public void stop() {
+ void stop() {
executorService.shutdown();
}
@@ -212,7 +212,7 @@ public class DelayedConfigResponses {
*
* @return and array of DelayedConfigResponse objects
*/
- public List<DelayedConfigResponse> drainQueue(ApplicationId app) {
+ List<DelayedConfigResponse> drainQueue(ApplicationId app) {
ArrayList<DelayedConfigResponse> ret = new ArrayList<>();
if (delayedResponses.containsKey(app)) {
@@ -223,11 +223,12 @@ public class DelayedConfigResponses {
return ret;
}
+ @Override
public String toString() {
return "DelayedConfigResponses. Average Size=" + size();
}
- public int size() {
+ int size() {
int totalQueueSize = 0;
int numQueues = 0;
for (Map.Entry<ApplicationId, BlockingQueue<DelayedConfigResponse>> e : delayedResponses.entrySet()) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java
index 7fa60860b44..db863064834 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.config.server.rpc;
import com.yahoo.cloud.config.SentinelConfig;
+import com.yahoo.collections.Pair;
import com.yahoo.config.provision.TenantName;
import com.yahoo.component.Version;
import com.yahoo.jrt.Request;
@@ -66,13 +67,13 @@ class GetConfigProcessor implements Runnable {
}
// TODO: Increment statistics (Metrics) failed counters when requests fail
- public void run() {
+ public Pair<GetConfigContext, Long> getConfig(JRTServerConfigRequest request) {
//Request has already been detached
if ( ! request.validateParameters()) {
// Error code is set in verifyParameters if parameters are not OK.
log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
respond(request);
- return;
+ return null;
}
Trace trace = request.getRequestTrace();
if (logDebug(trace)) {
@@ -85,13 +86,13 @@ class GetConfigProcessor implements Runnable {
// fabricate an empty request to cause the sentinel to stop all running services
if (rpcServer.isHostedVespa() && rpcServer.allTenantsLoaded() && !tenant.isPresent() && isSentinelConfigRequest(request)) {
returnEmpty(request);
- return;
+ return null;
}
GetConfigContext context = rpcServer.createGetConfigContext(tenant, request, trace);
if (context == null || ! context.requestHandler().hasApplication(context.applicationId(), Optional.empty())) {
handleError(request, ErrorCode.APPLICATION_NOT_LOADED, "No application exists");
- return;
+ return null;
}
Optional<Version> vespaVersion = rpcServer.useRequestVersion() ?
@@ -103,7 +104,7 @@ class GetConfigProcessor implements Runnable {
if ( ! context.requestHandler().hasApplication(context.applicationId(), vespaVersion)) {
handleError(request, ErrorCode.UNKNOWN_VESPA_VERSION, "Unknown Vespa version in request: " + getPrintableVespaVersion(vespaVersion));
- return;
+ return null;
}
this.logPre = TenantRepository.logPre(context.applicationId());
@@ -112,14 +113,14 @@ class GetConfigProcessor implements Runnable {
config = rpcServer.resolveConfig(request, context, vespaVersion);
} catch (UnknownConfigDefinitionException e) {
handleError(request, ErrorCode.UNKNOWN_DEFINITION, "Unknown config definition " + request.getConfigKey());
- return;
+ return null;
} catch (UnknownConfigIdException e) {
handleError(request, ErrorCode.ILLEGAL_CONFIGID, "Illegal config id " + request.getConfigKey().getConfigId());
- return;
+ return null;
} catch (Throwable e) {
log.log(Level.SEVERE, "Unexpected error handling config request", e);
handleError(request, ErrorCode.INTERNAL_ERROR, "Internal error " + e.getMessage());
- return;
+ return null;
}
// config == null is not an error, but indicates that the config will be returned later.
@@ -134,7 +135,21 @@ class GetConfigProcessor implements Runnable {
if (logDebug(trace)) {
debugLog(trace, "delaying response " + request.getShortDescription());
}
- rpcServer.delayResponse(request, context);
+ return new Pair<>(context, config != null ? config.getGeneration() : 0);
+ }
+ return null;
+ }
+ @Override
+ public void run() {
+ Pair<GetConfigContext, Long> delayed = getConfig(request);
+
+ if (delayed != null) {
+ rpcServer.delayResponse(request, delayed.getFirst());
+ if (rpcServer.hasNewerGeneration(delayed.getFirst().applicationId(), delayed.getSecond())) {
+ // This will ensure that if the reload train left the station while I was boarding, another train will
+ // immediately be scheduled.
+ rpcServer.configReloaded(delayed.getFirst().applicationId());
+ }
}
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 1cae6ac2244..c16303fbc88 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -53,7 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
@@ -61,6 +60,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.stream.Stream;
@@ -73,9 +73,9 @@ import java.util.stream.Stream;
// TODO: Split business logic out of this
public class RpcServer implements Runnable, ReloadListener, TenantListener {
- public static final String getConfigMethodName = "getConfigV3";
+ static final String getConfigMethodName = "getConfigV3";
- static final int TRACELEVEL = 6;
+ private static final int TRACELEVEL = 6;
static final int TRACELEVEL_DEBUG = 9;
private static final String THREADPOOL_NAME = "rpcserver worker pool";
private static final long SHUTDOWN_TIMEOUT = 60;
@@ -87,10 +87,11 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
private static final Logger log = Logger.getLogger(RpcServer.class.getName());
- final DelayedConfigResponses delayedConfigResponses;
+ private final DelayedConfigResponses delayedConfigResponses;
private final HostRegistry<TenantName> hostRegistry;
private final Map<TenantName, TenantHandlerProvider> tenantProviders = new ConcurrentHashMap<>();
+ private final Map<ApplicationId, ApplicationState> applicationStateMap = new ConcurrentHashMap<>();
private final SuperModelRequestHandler superModelRequestHandler;
private final MetricUpdater metrics;
private final MetricUpdaterFactory metricUpdaterFactory;
@@ -102,6 +103,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
private volatile boolean allTenantsLoaded = false;
private boolean isRunning = false;
+ class ApplicationState {
+ private final AtomicLong activeGeneration = new AtomicLong(0);
+ ApplicationState(long generation) {
+ activeGeneration.set(generation);
+ }
+ long getActiveGeneration() { return activeGeneration.get(); }
+ void setActiveGeneration(long generation) { activeGeneration.set(generation); }
+ }
/**
* Creates an RpcServer listening on the specified <code>port</code>.
*
@@ -114,7 +123,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
this.superModelRequestHandler = superModelRequestHandler;
metricUpdaterFactory = metrics;
supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize());
- this.metrics = metrics.getOrCreateMetricUpdater(Collections.<String, String>emptyMap());
+ this.metrics = metrics.getOrCreateMetricUpdater(Collections.emptyMap());
this.hostLivenessTracker = hostLivenessTracker;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients());
int numberOfRpcThreads = (config.numRpcThreads() == 0) ? Runtime.getRuntime().availableProcessors() : config.numRpcThreads();
@@ -161,7 +170,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
*
* @param req a Request
*/
- public final void printStatistics(Request req) {
+ @SuppressWarnings("UnusedDeclaration")
+ public void printStatistics(Request req) {
req.returnValues().add(new StringValue("Delayed responses queue size: " + delayedConfigResponses.size()));
}
@@ -214,6 +224,17 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
+ private ApplicationState getState(ApplicationId id) {
+ ApplicationState state = applicationStateMap.get(id);
+ if (state == null) {
+ applicationStateMap.putIfAbsent(id, new ApplicationState(0));
+ state = applicationStateMap.get(id);
+ }
+ return state;
+ }
+ boolean hasNewerGeneration(ApplicationId id, long generation) {
+ return getState(id).getActiveGeneration() > generation;
+ }
/**
* Checks all delayed responses for config changes and waits until all has been answered.
* This method should be called when config is reloaded in the server.
@@ -221,16 +242,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@Override
public void configActivated(ApplicationSet applicationSet) {
ApplicationId applicationId = applicationSet.getId();
- configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId));
+ ApplicationState state = getState(applicationId);
+ state.setActiveGeneration(applicationSet.getApplicationGeneration());
+ configReloaded(applicationId);
reloadSuperModel(applicationSet);
}
private void reloadSuperModel(ApplicationSet applicationSet) {
superModelRequestHandler.reloadConfig(applicationSet);
- configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global()));
+ configReloaded(ApplicationId.global());
}
- private void configReloaded(List<DelayedConfigResponses.DelayedConfigResponse> responses, String logPre) {
+ void configReloaded(ApplicationId applicationId) {
+ List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId);
+ String logPre = TenantRepository.logPre(applicationId);
if (log.isLoggable(LogLevel.DEBUG)) {
log.log(LogLevel.DEBUG, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue");
}
@@ -285,8 +310,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
@Override
public void applicationRemoved(ApplicationId applicationId) {
superModelRequestHandler.removeApplication(applicationId);
- configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId));
- configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global()));
+ configReloaded(applicationId);
+ configReloaded(ApplicationId.global());
}
public void respond(JRTServerConfigRequest request) {
@@ -300,7 +325,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
* Returns the tenant for this request, empty if there is no tenant for this request
* (which on hosted Vespa means that the requesting host is not currently active for any tenant)
*/
- public Optional<TenantName> resolveTenant(JRTServerConfigRequest request, Trace trace) {
+ Optional<TenantName> resolveTenant(JRTServerConfigRequest request, Trace trace) {
if ("*".equals(request.getConfigKey().getConfigId())) return Optional.of(ApplicationId.global().tenant());
String hostname = request.getClientHostName();
TenantName tenant = hostRegistry.getKeyForHost(hostname);
@@ -321,12 +346,12 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
return context.requestHandler().resolveConfig(context.applicationId(), request, vespaVersion);
}
- protected Supervisor getSupervisor() {
+ private Supervisor getSupervisor() {
return supervisor;
}
- Boolean addToRequestQueue(JRTServerConfigRequest request) {
- return addToRequestQueue(request, false, null);
+ private void addToRequestQueue(JRTServerConfigRequest request) {
+ addToRequestQueue(request, false, null);
}
public Boolean addToRequestQueue(JRTServerConfigRequest request, boolean forceResponse, CompletionService<Boolean> completionService) {
@@ -338,13 +363,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
if (completionService == null) {
executorService.submit(task);
} else {
- completionService.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- task.run();
- return true;
- }
- });
+ completionService.submit(() -> { task.run();return true;});
}
updateWorkQueueMetrics();
return true;
@@ -363,7 +382,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
/**
* Returns the context for this request, or null if the server is not properly set up with handlers
*/
- public GetConfigContext createGetConfigContext(Optional<TenantName> optionalTenant, JRTServerConfigRequest request, Trace trace) {
+ GetConfigContext createGetConfigContext(Optional<TenantName> optionalTenant, JRTServerConfigRequest request, Trace trace) {
if ("*".equals(request.getConfigKey().getConfigId())) {
return GetConfigContext.create(ApplicationId.global(), superModelRequestHandler, trace);
}
@@ -394,16 +413,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
return tenantProviders.get(tenant).getRequestHandler();
}
- public void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
+ void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
delayedConfigResponses.delayResponse(request, context);
}
@Override
public void onTenantDelete(TenantName tenant) {
log.log(LogLevel.DEBUG, TenantRepository.logPre(tenant)+"Tenant deleted, removing request handler and cleaning host registry");
- if (tenantProviders.containsKey(tenant)) {
- tenantProviders.remove(tenant);
- }
+ tenantProviders.remove(tenant);
hostRegistry.removeHostsForKey(tenant);
}
@@ -540,4 +557,5 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
new FileReferenceDownload(fileReference, false /* downloadFromOtherSourceIfNotFound */)));
req.returnValues().add(new Int32Value(0));
}
+
}