diff options
author | Harald Musum <musum@oath.com> | 2019-01-07 12:28:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-07 12:28:03 +0100 |
commit | 2f58f9a08d1c8dd7344aba9a8d0204f35b329d16 (patch) | |
tree | 2fb92e62269de68f1f6c58fc10e1dc50bda1e083 /configserver | |
parent | d14466361287e45e4c12021d6c1497bb63ed4ce1 (diff) | |
parent | 132a570ef9da7822198396079b8c0c9e27c81755 (diff) |
Merge pull request #8037 from vespa-engine/balder/ensure-request-is-not-forgotten
Balder/ensure request is not forgotten
Diffstat (limited to 'configserver')
3 files changed, 81 insertions, 47 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java index 5fae6e23323..f58c6cfdee5 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java @@ -45,7 +45,7 @@ public class DelayedConfigResponses { private final Map<ApplicationId, BlockingQueue<DelayedConfigResponse>> delayedResponses = new ConcurrentHashMap<>(); - public DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads) { + DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads) { this(rpcServer, numTimerThreads, true); } @@ -70,19 +70,20 @@ public class DelayedConfigResponses { * The run method of this class is run by a Timer when the timeout expires. * The timer associated with this response must be cancelled first. */ - public class DelayedConfigResponse implements Runnable, TargetWatcher { + class DelayedConfigResponse implements Runnable, TargetWatcher { final JRTServerConfigRequest request; private final BlockingQueue<DelayedConfigResponse> delayedResponsesQueue; private final ApplicationId app; private ScheduledFuture<?> future; - public DelayedConfigResponse(JRTServerConfigRequest req, BlockingQueue<DelayedConfigResponse> delayedResponsesQueue, ApplicationId app) { + DelayedConfigResponse(JRTServerConfigRequest req, BlockingQueue<DelayedConfigResponse> delayedResponsesQueue, ApplicationId app) { this.request = req; this.delayedResponsesQueue = delayedResponsesQueue; this.app = app; } + @Override public synchronized void run() { removeFromQueue(); removeWatcher(); @@ -99,18 +100,17 @@ public class DelayedConfigResponses { delayedResponsesQueue.remove(this); } - public JRTServerConfigRequest getRequest() { + JRTServerConfigRequest getRequest() { return request; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Delayed response for ").append(logPre()).append(request.getShortDescription()); return sb.toString(); } - public ApplicationId getAppId() { return app; } - String logPre() { return TenantRepository.logPre(app); } @@ -128,7 +128,7 @@ public class DelayedConfigResponses { return future.cancel(false); } - public synchronized void schedule(long delay) throws InterruptedException { + synchronized void schedule(long delay) throws InterruptedException { delayedResponsesQueue.put(this); future = executorService.schedule(this, delay, TimeUnit.MILLISECONDS); addWatcher(); @@ -167,7 +167,7 @@ public class DelayedConfigResponses { * * @param request a JRTConfigRequest */ - public final void delayResponse(JRTServerConfigRequest request, GetConfigContext context) { + final void delayResponse(JRTServerConfigRequest request, GetConfigContext context) { if (request.isDelayedResponse()) { log.log(LogLevel.DEBUG, context.logPre()+"Request already delayed"); } else { @@ -203,7 +203,7 @@ public class DelayedConfigResponses { } } - public void stop() { + void stop() { executorService.shutdown(); } @@ -212,7 +212,7 @@ public class DelayedConfigResponses { * * @return and array of DelayedConfigResponse objects */ - public List<DelayedConfigResponse> drainQueue(ApplicationId app) { + List<DelayedConfigResponse> drainQueue(ApplicationId app) { ArrayList<DelayedConfigResponse> ret = new ArrayList<>(); if (delayedResponses.containsKey(app)) { @@ -223,11 +223,12 @@ public class DelayedConfigResponses { return ret; } + @Override public String toString() { return "DelayedConfigResponses. Average Size=" + size(); } - public int size() { + int size() { int totalQueueSize = 0; int numQueues = 0; for (Map.Entry<ApplicationId, BlockingQueue<DelayedConfigResponse>> e : delayedResponses.entrySet()) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java index 7fa60860b44..db863064834 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/GetConfigProcessor.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.config.server.rpc; import com.yahoo.cloud.config.SentinelConfig; +import com.yahoo.collections.Pair; import com.yahoo.config.provision.TenantName; import com.yahoo.component.Version; import com.yahoo.jrt.Request; @@ -66,13 +67,13 @@ class GetConfigProcessor implements Runnable { } // TODO: Increment statistics (Metrics) failed counters when requests fail - public void run() { + public Pair<GetConfigContext, Long> getConfig(JRTServerConfigRequest request) { //Request has already been detached if ( ! request.validateParameters()) { // Error code is set in verifyParameters if parameters are not OK. log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); respond(request); - return; + return null; } Trace trace = request.getRequestTrace(); if (logDebug(trace)) { @@ -85,13 +86,13 @@ class GetConfigProcessor implements Runnable { // fabricate an empty request to cause the sentinel to stop all running services if (rpcServer.isHostedVespa() && rpcServer.allTenantsLoaded() && !tenant.isPresent() && isSentinelConfigRequest(request)) { returnEmpty(request); - return; + return null; } GetConfigContext context = rpcServer.createGetConfigContext(tenant, request, trace); if (context == null || ! context.requestHandler().hasApplication(context.applicationId(), Optional.empty())) { handleError(request, ErrorCode.APPLICATION_NOT_LOADED, "No application exists"); - return; + return null; } Optional<Version> vespaVersion = rpcServer.useRequestVersion() ? @@ -103,7 +104,7 @@ class GetConfigProcessor implements Runnable { if ( ! context.requestHandler().hasApplication(context.applicationId(), vespaVersion)) { handleError(request, ErrorCode.UNKNOWN_VESPA_VERSION, "Unknown Vespa version in request: " + getPrintableVespaVersion(vespaVersion)); - return; + return null; } this.logPre = TenantRepository.logPre(context.applicationId()); @@ -112,14 +113,14 @@ class GetConfigProcessor implements Runnable { config = rpcServer.resolveConfig(request, context, vespaVersion); } catch (UnknownConfigDefinitionException e) { handleError(request, ErrorCode.UNKNOWN_DEFINITION, "Unknown config definition " + request.getConfigKey()); - return; + return null; } catch (UnknownConfigIdException e) { handleError(request, ErrorCode.ILLEGAL_CONFIGID, "Illegal config id " + request.getConfigKey().getConfigId()); - return; + return null; } catch (Throwable e) { log.log(Level.SEVERE, "Unexpected error handling config request", e); handleError(request, ErrorCode.INTERNAL_ERROR, "Internal error " + e.getMessage()); - return; + return null; } // config == null is not an error, but indicates that the config will be returned later. @@ -134,7 +135,21 @@ class GetConfigProcessor implements Runnable { if (logDebug(trace)) { debugLog(trace, "delaying response " + request.getShortDescription()); } - rpcServer.delayResponse(request, context); + return new Pair<>(context, config != null ? config.getGeneration() : 0); + } + return null; + } + @Override + public void run() { + Pair<GetConfigContext, Long> delayed = getConfig(request); + + if (delayed != null) { + rpcServer.delayResponse(request, delayed.getFirst()); + if (rpcServer.hasNewerGeneration(delayed.getFirst().applicationId(), delayed.getSecond())) { + // This will ensure that if the reload train left the station while I was boarding, another train will + // immediately be scheduled. + rpcServer.configReloaded(delayed.getFirst().applicationId()); + } } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 1cae6ac2244..c16303fbc88 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -53,7 +53,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; @@ -61,6 +60,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import java.util.stream.Stream; @@ -73,9 +73,9 @@ import java.util.stream.Stream; // TODO: Split business logic out of this public class RpcServer implements Runnable, ReloadListener, TenantListener { - public static final String getConfigMethodName = "getConfigV3"; + static final String getConfigMethodName = "getConfigV3"; - static final int TRACELEVEL = 6; + private static final int TRACELEVEL = 6; static final int TRACELEVEL_DEBUG = 9; private static final String THREADPOOL_NAME = "rpcserver worker pool"; private static final long SHUTDOWN_TIMEOUT = 60; @@ -87,10 +87,11 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private static final Logger log = Logger.getLogger(RpcServer.class.getName()); - final DelayedConfigResponses delayedConfigResponses; + private final DelayedConfigResponses delayedConfigResponses; private final HostRegistry<TenantName> hostRegistry; private final Map<TenantName, TenantHandlerProvider> tenantProviders = new ConcurrentHashMap<>(); + private final Map<ApplicationId, ApplicationState> applicationStateMap = new ConcurrentHashMap<>(); private final SuperModelRequestHandler superModelRequestHandler; private final MetricUpdater metrics; private final MetricUpdaterFactory metricUpdaterFactory; @@ -102,6 +103,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { private volatile boolean allTenantsLoaded = false; private boolean isRunning = false; + class ApplicationState { + private final AtomicLong activeGeneration = new AtomicLong(0); + ApplicationState(long generation) { + activeGeneration.set(generation); + } + long getActiveGeneration() { return activeGeneration.get(); } + void setActiveGeneration(long generation) { activeGeneration.set(generation); } + } /** * Creates an RpcServer listening on the specified <code>port</code>. * @@ -114,7 +123,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { this.superModelRequestHandler = superModelRequestHandler; metricUpdaterFactory = metrics; supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize()); - this.metrics = metrics.getOrCreateMetricUpdater(Collections.<String, String>emptyMap()); + this.metrics = metrics.getOrCreateMetricUpdater(Collections.emptyMap()); this.hostLivenessTracker = hostLivenessTracker; BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(config.maxgetconfigclients()); int numberOfRpcThreads = (config.numRpcThreads() == 0) ? Runtime.getRuntime().availableProcessors() : config.numRpcThreads(); @@ -161,7 +170,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { * * @param req a Request */ - public final void printStatistics(Request req) { + @SuppressWarnings("UnusedDeclaration") + public void printStatistics(Request req) { req.returnValues().add(new StringValue("Delayed responses queue size: " + delayedConfigResponses.size())); } @@ -214,6 +224,17 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { .returnDesc(0, "ret", "0 if success, 1 otherwise")); } + private ApplicationState getState(ApplicationId id) { + ApplicationState state = applicationStateMap.get(id); + if (state == null) { + applicationStateMap.putIfAbsent(id, new ApplicationState(0)); + state = applicationStateMap.get(id); + } + return state; + } + boolean hasNewerGeneration(ApplicationId id, long generation) { + return getState(id).getActiveGeneration() > generation; + } /** * Checks all delayed responses for config changes and waits until all has been answered. * This method should be called when config is reloaded in the server. @@ -221,16 +242,20 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void configActivated(ApplicationSet applicationSet) { ApplicationId applicationId = applicationSet.getId(); - configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId)); + ApplicationState state = getState(applicationId); + state.setActiveGeneration(applicationSet.getApplicationGeneration()); + configReloaded(applicationId); reloadSuperModel(applicationSet); } private void reloadSuperModel(ApplicationSet applicationSet) { superModelRequestHandler.reloadConfig(applicationSet); - configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global())); + configReloaded(ApplicationId.global()); } - private void configReloaded(List<DelayedConfigResponses.DelayedConfigResponse> responses, String logPre) { + void configReloaded(ApplicationId applicationId) { + List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId); + String logPre = TenantRepository.logPre(applicationId); if (log.isLoggable(LogLevel.DEBUG)) { log.log(LogLevel.DEBUG, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue"); } @@ -285,8 +310,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { @Override public void applicationRemoved(ApplicationId applicationId) { superModelRequestHandler.removeApplication(applicationId); - configReloaded(delayedConfigResponses.drainQueue(applicationId), TenantRepository.logPre(applicationId)); - configReloaded(delayedConfigResponses.drainQueue(ApplicationId.global()), TenantRepository.logPre(ApplicationId.global())); + configReloaded(applicationId); + configReloaded(ApplicationId.global()); } public void respond(JRTServerConfigRequest request) { @@ -300,7 +325,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { * Returns the tenant for this request, empty if there is no tenant for this request * (which on hosted Vespa means that the requesting host is not currently active for any tenant) */ - public Optional<TenantName> resolveTenant(JRTServerConfigRequest request, Trace trace) { + Optional<TenantName> resolveTenant(JRTServerConfigRequest request, Trace trace) { if ("*".equals(request.getConfigKey().getConfigId())) return Optional.of(ApplicationId.global().tenant()); String hostname = request.getClientHostName(); TenantName tenant = hostRegistry.getKeyForHost(hostname); @@ -321,12 +346,12 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return context.requestHandler().resolveConfig(context.applicationId(), request, vespaVersion); } - protected Supervisor getSupervisor() { + private Supervisor getSupervisor() { return supervisor; } - Boolean addToRequestQueue(JRTServerConfigRequest request) { - return addToRequestQueue(request, false, null); + private void addToRequestQueue(JRTServerConfigRequest request) { + addToRequestQueue(request, false, null); } public Boolean addToRequestQueue(JRTServerConfigRequest request, boolean forceResponse, CompletionService<Boolean> completionService) { @@ -338,13 +363,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { if (completionService == null) { executorService.submit(task); } else { - completionService.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - task.run(); - return true; - } - }); + completionService.submit(() -> { task.run();return true;}); } updateWorkQueueMetrics(); return true; @@ -363,7 +382,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { /** * Returns the context for this request, or null if the server is not properly set up with handlers */ - public GetConfigContext createGetConfigContext(Optional<TenantName> optionalTenant, JRTServerConfigRequest request, Trace trace) { + GetConfigContext createGetConfigContext(Optional<TenantName> optionalTenant, JRTServerConfigRequest request, Trace trace) { if ("*".equals(request.getConfigKey().getConfigId())) { return GetConfigContext.create(ApplicationId.global(), superModelRequestHandler, trace); } @@ -394,16 +413,14 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { return tenantProviders.get(tenant).getRequestHandler(); } - public void delayResponse(JRTServerConfigRequest request, GetConfigContext context) { + void delayResponse(JRTServerConfigRequest request, GetConfigContext context) { delayedConfigResponses.delayResponse(request, context); } @Override public void onTenantDelete(TenantName tenant) { log.log(LogLevel.DEBUG, TenantRepository.logPre(tenant)+"Tenant deleted, removing request handler and cleaning host registry"); - if (tenantProviders.containsKey(tenant)) { - tenantProviders.remove(tenant); - } + tenantProviders.remove(tenant); hostRegistry.removeHostsForKey(tenant); } @@ -540,4 +557,5 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { new FileReferenceDownload(fileReference, false /* downloadFromOtherSourceIfNotFound */))); req.returnValues().add(new Int32Value(0)); } + } |