summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-02-21 12:18:04 +0100
committerHarald Musum <musum@verizonmedia.com>2020-02-21 12:18:04 +0100
commit8dafc34949693548a5a30566112f306c613b7922 (patch)
treedabfc8ae1c47e9d9ef2482115956c02cde1de305 /config-proxy
parent25a3eb4ce927bd5ea1225bfb77b69ce3512b0647 (diff)
Move delayed responses and scheduler into RpcConfigSourceClient
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java29
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java23
2 files changed, 23 insertions, 29 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index 72c8db28b21..fc1c7f62048 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
@@ -15,14 +14,10 @@ import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownl
import com.yahoo.yolean.system.CatchSignals;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT;
-import static java.util.concurrent.TimeUnit.SECONDS;
/**
* A proxy server that handles RPC config requests. The proxy can run in two modes:
@@ -39,14 +34,9 @@ public class ProxyServer implements Runnable {
private final static Logger log = Logger.getLogger(ProxyServer.class.getName());
private final AtomicBoolean signalCaught = new AtomicBoolean(false);
-
- // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
private final Supervisor supervisor = new Supervisor(new Transport(JRT_TRANSPORT_THREADS));
- private ScheduledFuture<?> delayedResponseScheduler;
private final ConfigProxyRpcServer rpcServer;
- final DelayedResponses delayedResponses;
private ConfigSourceSet configSource;
private volatile ConfigSourceClient configClient;
@@ -57,12 +47,11 @@ public class ProxyServer implements Runnable {
private volatile Mode mode = new Mode(DEFAULT);
ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) {
- this.delayedResponses = new DelayedResponses();
this.configSource = source;
log.log(LogLevel.DEBUG, "Using config source '" + source);
this.memoryCache = memoryCache;
this.rpcServer = createRpcServer(spec);
- this.configClient = (configClient == null) ? createRpcClient(rpcServer, delayedResponses, source, memoryCache) : configClient;
+ this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient;
this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
}
@@ -73,11 +62,6 @@ public class ProxyServer implements Runnable {
t.setName("RpcServer");
t.start();
}
- // Wait for 5 seconds initially, then run every second
- delayedResponseScheduler = scheduler.scheduleAtFixedRate(new DelayedResponseHandler(delayedResponses,
- memoryCache,
- rpcServer),
- 5, 1, SECONDS);
}
RawConfig resolveConfig(JRTServerConfigRequest req) {
@@ -109,7 +93,7 @@ public class ProxyServer implements Runnable {
break;
case DEFAULT:
flush();
- configClient = createRpcClient(rpcServer, delayedResponses, configSource, memoryCache);
+ configClient = createRpcClient(rpcServer, configSource, memoryCache);
this.mode = new Mode(modeName);
break;
default:
@@ -122,9 +106,8 @@ public class ProxyServer implements Runnable {
return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
}
- private RpcConfigSourceClient createRpcClient(RpcServer rpcServer, DelayedResponses delayedResponses,
- ConfigSourceSet source, MemoryCache memoryCache) {
- return new RpcConfigSourceClient(rpcServer, source, memoryCache, delayedResponses);
+ private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) {
+ return new RpcConfigSourceClient(rpcServer, source, memoryCache);
}
private void setupSignalHandler() {
@@ -191,7 +174,7 @@ public class ProxyServer implements Runnable {
void stop() {
Event.stopping("configproxy", "shutdown");
if (rpcServer != null) rpcServer.shutdown();
- if (delayedResponseScheduler != null) delayedResponseScheduler.cancel(true);
+ if (configClient != null) configClient.cancel();
flush();
fileDistributionAndUrlDownload.close();
}
@@ -211,7 +194,7 @@ public class ProxyServer implements Runnable {
void updateSourceConnections(List<String> sources) {
configSource = new ConfigSourceSet(sources);
flush();
- configClient = createRpcClient(rpcServer, delayedResponses, configSource, memoryCache);
+ configClient = createRpcClient(rpcServer, configSource, memoryCache);
}
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index a674a43be8d..94d491b964d 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -22,8 +22,12 @@ import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.logging.Logger;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
/**
* An Rpc client to a config source
*
@@ -43,9 +47,11 @@ class RpcConfigSourceClient implements ConfigSourceClient {
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses;
private final static TimingValues timingValues;
-
private final ExecutorService exec;
private final JRTConfigRequester requester;
+ // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
+ private ScheduledFuture<?> delayedResponseScheduler;
static {
// Proxy should time out before clients upon subscription.
@@ -57,17 +63,20 @@ class RpcConfigSourceClient implements ConfigSourceClient {
timingValues = tv;
}
- RpcConfigSourceClient(RpcServer rpcServer,
- ConfigSourceSet configSourceSet,
- MemoryCache memoryCache,
- DelayedResponses delayedResponses) {
+ RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
this.rpcServer = rpcServer;
this.configSourceSet = configSourceSet;
this.memoryCache = memoryCache;
- this.delayedResponses = delayedResponses;
+ this.delayedResponses = new DelayedResponses();
checkConfigSources();
exec = Executors.newCachedThreadPool(new DaemonThreadFactory("subscriber-"));
requester = JRTConfigRequester.create(configSourceSet, timingValues);
+ // Wait for 5 seconds initially, then run every second
+ delayedResponseScheduler = scheduler.scheduleAtFixedRate(
+ new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer),
+ 5,
+ 1,
+ SECONDS);
}
/**
@@ -166,6 +175,8 @@ class RpcConfigSourceClient implements ConfigSourceClient {
@Override
public void cancel() {
shutdownSourceConnections();
+ delayedResponseScheduler.cancel(true);
+ scheduler.shutdown();
}
/**