diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-11 11:20:36 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-11 11:20:36 +0100 |
commit | d2e457ca7523c32825402eedf0052576034feccf (patch) | |
tree | 2d14ba09fcd09a53bddb4880cacc2e9a3c633262 /container-core/src/main/java/com/yahoo/container | |
parent | 2a23dab552cb9fc98eb54d7351782ce01f171712 (diff) |
Use a thread executor to fire off subcribes to multiple keys concurrently.
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container')
3 files changed, 32 insertions, 4 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java index 5cbb009d54b..b2e1196e9f8 100644 --- a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java +++ b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java @@ -10,6 +10,9 @@ import com.yahoo.vespa.config.ConfigKey; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.logging.Logger; import static java.util.logging.Level.FINE; @@ -29,10 +32,16 @@ public class CloudSubscriber implements Subscriber { // if waitNextGeneration has not yet been called, -1 should be returned private long generation = -1L; - CloudSubscriber(String name, ConfigSource configSource, Set<ConfigKey<ConfigInstance>> keys) { + CloudSubscriber(ExecutorService executor, String name, ConfigSource configSource, Set<ConfigKey<ConfigInstance>> keys) { this.name = name; this.subscriber = new ConfigSubscriber(configSource); - keys.forEach(k -> handles.put(k, subscriber.subscribe(k.getConfigClass(), k.getConfigId()))); + Map<ConfigKey<ConfigInstance>, Future<ConfigHandle<ConfigInstance>>> futureHandles = new HashMap<>(); + keys.forEach(k -> futureHandles.put(k, executor.submit(() -> subscriber.subscribe(k.getConfigClass(), k.getConfigId())))); + futureHandles.forEach((k, f) -> { + try { + handles.put(k, f.get()); + } catch (InterruptedException | ExecutionException e) {} + }); } @Override diff --git a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java index a6327b01e21..c215e4d82a0 100644 --- a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java +++ b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.di; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.ConfigInstance; import com.yahoo.config.subscription.ConfigSource; import com.yahoo.config.subscription.ConfigSourceSet; @@ -15,6 +16,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author Tony Vaagenes @@ -24,11 +29,25 @@ public class CloudSubscriberFactory implements SubscriberFactory { private final ConfigSource configSource; private final Map<CloudSubscriber, Integer> activeSubscribers = new WeakHashMap<>(); + private final ExecutorService executor; private Optional<Long> testGeneration = Optional.empty(); public CloudSubscriberFactory(ConfigSource configSource) { this.configSource = configSource; + executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, + 1, TimeUnit.SECONDS, new SynchronousQueue<>(), + new DaemonThreadFactory("cloud-subscriber-factory")); + } + + @Override + public void close() { + executor.shutdown(); + try { + if ( ! executor.awaitTermination(10, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { } } @Override @@ -39,7 +58,7 @@ public class CloudSubscriberFactory implements SubscriberFactory { ConfigKey<ConfigInstance> invariant = (ConfigKey<ConfigInstance>) key; subscriptionKeys.add(invariant); } - CloudSubscriber subscriber = new CloudSubscriber(name, configSource, subscriptionKeys); + CloudSubscriber subscriber = new CloudSubscriber(executor, name, configSource, subscriptionKeys); testGeneration.ifPresent(subscriber.getSubscriber()::reload); // TODO: test specific code, remove activeSubscribers.put(subscriber, 0); diff --git a/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java b/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java index f11f44413c7..bfb649fa82e 100644 --- a/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java +++ b/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java @@ -16,5 +16,5 @@ public interface SubscriberFactory { Subscriber getSubscriber(Set<? extends ConfigKey<?>> configKeys, String name); void reloadActiveSubscribers(long generation); - + void close(); } |