diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-08-17 16:07:26 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-08-17 16:07:26 +0200 |
commit | a7f54ac5c18c9deaa45d870c743da2c9a38a6208 (patch) | |
tree | 0456aaf1ace25f87019d3fe509b71a4a7b2b7e61 /container-disc/src | |
parent | 3c56ab8b0ffe6588183809f331bb2f8cf02235ab (diff) |
Handle rpc port changes live.
Diffstat (limited to 'container-disc/src')
-rw-r--r-- | container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java | 96 |
1 files changed, 59 insertions, 37 deletions
diff --git a/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java b/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java index 7782e54dc90..e5eb7fdac33 100644 --- a/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java +++ b/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java @@ -56,7 +56,6 @@ import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; @@ -85,7 +84,7 @@ public final class ConfiguredApplication implements Application { // Subscriber that is used when this is not a standalone-container. Subscribes // to config to make sure that container will be registered in slobrok (by {@link com.yahoo.jrt.slobrok.api.Register}) // if slobrok config changes (typically slobroks moving to other nodes) - private final Optional<SlobrokConfigSubscriber> slobrokConfigSubscriber; + private final SlobrokConfigSubscriber slobrokConfigSubscriber; private final ShutdownDeadline shutdownDeadline; //TODO: FilterChainRepository should instead always be set up in the model. @@ -151,8 +150,8 @@ public final class ConfiguredApplication implements Application { this.metric = metric; this.configId = System.getProperty("config.id"); this.slobrokConfigSubscriber = (subscriberFactory instanceof CloudSubscriberFactory) - ? Optional.of(new SlobrokConfigSubscriber(configId)) - : Optional.empty(); + ? new SlobrokConfigSubscriber(configId) + : null; this.restrictedOsgiFramework = new DisableOsgiFramework(new RestrictedBundleContext(osgiFramework.bundleContext())); this.shutdownDeadline = new ShutdownDeadline(configId); this.reconfigurerThread = new Thread(this::doReconfigurationLoop, "configured-application-reconfigurer"); @@ -161,8 +160,8 @@ public final class ConfiguredApplication implements Application { @Override public void start() { - qrConfig = getConfig(QrConfig.class, true); - reconfigure(qrConfig); + qrConfig = getConfig(QrConfig.class); + reconfigure(qrConfig.shutdown()); hackToInitializeServer(qrConfig); ContainerBuilder builder = createBuilderWithGuiceBindings(); @@ -173,27 +172,37 @@ public final class ConfiguredApplication implements Application { portWatcher.setDaemon(true); portWatcher.start(); - if (setupRpc()) { - slobrokRegistrator = registerInSlobrok(qrConfig); // marks this as up - } + setupRpc(qrConfig); } - private boolean setupRpc() { - if ( ! qrConfig.rpc().enabled()) return false; + private synchronized void setupRpc(QrConfig cfg) { + if (!cfg.rpc().enabled()) return; supervisor = new Supervisor(new Transport("configured-application")).setDropEmptyBuffers(true); supervisor.addMethod(new Method("prepareStop", "d", "", this::prepareStop)); - Spec listenSpec = new Spec(qrConfig.rpc().port()); + listenRpc(cfg); + } + + private synchronized void listenRpc(QrConfig cfg) { + Spec listenSpec = new Spec(cfg.rpc().port()); try { acceptor = supervisor.listen(listenSpec); - return true; + slobrokRegistrator = registerInSlobrok(cfg, acceptor.port()); } catch (ListenFailedException e) { throw new RuntimeException("Could not create rpc server listening on " + listenSpec, e); } } - private Register registerInSlobrok(QrConfig qrConfig) { + private void reListenRpc(QrConfig cfg) { + unregisterInSlobrok(); + if (supervisor == null) { + setupRpc(cfg); + } else if (cfg.rpc().enabled()) { + listenRpc(cfg); + } + } + private Register registerInSlobrok(QrConfig qrConfig, int port) { SlobrokList slobrokList = getSlobrokList(); - Spec mySpec = new Spec(HostName.getLocalhost(), acceptor.port()); + Spec mySpec = new Spec(HostName.getLocalhost(), port); Register slobrokRegistrator = new Register(supervisor, slobrokList, mySpec); slobrokRegistrator.registerName(qrConfig.rpc().slobrokId()); log.log(Level.INFO, "Registered name '" + qrConfig.rpc().slobrokId() + @@ -205,23 +214,25 @@ public final class ConfiguredApplication implements Application { // or need to get the config directly (standalone container) private SlobrokList getSlobrokList() { SlobrokList slobrokList; - if (slobrokConfigSubscriber.isPresent()) { - slobrokList = slobrokConfigSubscriber.get().getSlobroks(); + if (slobrokConfigSubscriber != null) { + slobrokList = slobrokConfigSubscriber.getSlobroks(); } else { slobrokList = new SlobrokList(); - SlobroksConfig slobrokConfig = getConfig(SlobroksConfig.class, true); + SlobroksConfig slobrokConfig = getConfig(SlobroksConfig.class); slobrokList.setup(slobrokConfig.slobrok().stream().map(SlobroksConfig.Slobrok::connectionspec).toArray(String[]::new)); } return slobrokList; } - private void unregisterInSlobrok() { - if (slobrokRegistrator != null) + private synchronized void unregisterInSlobrok() { + if (slobrokRegistrator != null) { slobrokRegistrator.shutdown(); - if (acceptor != null) + slobrokRegistrator = null; + } + if (acceptor != null) { acceptor.shutdown().join(); - if (supervisor != null) - supervisor.transport().shutdown().join(); + acceptor = null; + } } private static void hackToInitializeServer(QrConfig config) { @@ -234,11 +245,11 @@ public final class ConfiguredApplication implements Application { } } - private <T extends ConfigInstance> T getConfig(Class<T> configClass, boolean isInitializing) { + private <T extends ConfigInstance> T getConfig(Class<T> configClass) { Subscriber subscriber = subscriberFactory.getSubscriber(Collections.singleton(new ConfigKey<>(configClass, configId)), configClass.getName()); try { - subscriber.waitNextGeneration(isInitializing); + subscriber.waitNextGeneration(true); return configClass.cast(first(subscriber.config().values())); } finally { subscriber.close(); @@ -251,24 +262,31 @@ public final class ConfiguredApplication implements Application { try { while (true) { subscriber.waitNextGeneration(false); - QrConfig newConfig = QrConfig.class.cast(first(subscriber.config().values())); - reconfigure(qrConfig); - if (qrConfig.rpc().port() != newConfig.rpc().port()) { - com.yahoo.protect.Process.logAndDie( - "Rpc port config has changed from " + - qrConfig.rpc().port() + " to " + newConfig.rpc().port() + - ". This we can not handle without a restart so we will just bail out."); + if (first(subscriber.config().values()) instanceof QrConfig newConfig) { + reconfigure(newConfig.shutdown()); + synchronized (this) { + if (qrConfig.rpc().port() != newConfig.rpc().port()) { + try { + reListenRpc(newConfig); + } catch (Throwable e) { + com.yahoo.protect.Process.logAndDie("Rpc port config has changed from " + + qrConfig.rpc().port() + " to " + newConfig.rpc().port() + + ", and we were not able to reconfigure so we will just bail out and restart.", e); + } + } + qrConfig = newConfig; + } + log.fine("Received new QrConfig :" + newConfig); } - log.fine("Received new QrConfig :" + newConfig); } } finally { subscriber.close(); } } - void reconfigure(QrConfig qrConfig) { - dumpHeapOnShutdownTimeout.set(qrConfig.shutdown().dumpHeapOnTimeout()); - shutdownTimeoutS.set(qrConfig.shutdown().timeout()); + private void reconfigure(QrConfig.Shutdown shutdown) { + dumpHeapOnShutdownTimeout.set(shutdown.dumpHeapOnTimeout()); + shutdownTimeoutS.set(shutdown.timeout()); } private void initializeAndActivateContainer(ContainerBuilder builder, Runnable cleanupTask) { @@ -454,9 +472,13 @@ public final class ConfiguredApplication implements Application { if (configurer != null) { configurer.shutdown(); } - slobrokConfigSubscriber.ifPresent(SlobrokConfigSubscriber::shutdown); + if (slobrokConfigSubscriber != null) { + slobrokConfigSubscriber.shutdown(); + } Container.get().shutdown(); unregisterInSlobrok(); + if (supervisor != null) + supervisor.transport().shutdown().join(); shutdownDeadline.cancel(); log.info("Destroy: Finished"); } |