summaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-11 11:20:36 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-11 11:20:36 +0100
commitd2e457ca7523c32825402eedf0052576034feccf (patch)
tree2d14ba09fcd09a53bddb4880cacc2e9a3c633262 /container-core
parent2a23dab552cb9fc98eb54d7351782ce01f171712 (diff)
Use a thread executor to fire off subcribes to multiple keys concurrently.
Diffstat (limited to 'container-core')
-rw-r--r--container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java13
-rw-r--r--container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java21
-rw-r--r--container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java2
3 files changed, 32 insertions, 4 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java
index 5cbb009d54b..b2e1196e9f8 100644
--- a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java
+++ b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriber.java
@@ -10,6 +10,9 @@ import com.yahoo.vespa.config.ConfigKey;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.logging.Logger;
import static java.util.logging.Level.FINE;
@@ -29,10 +32,16 @@ public class CloudSubscriber implements Subscriber {
// if waitNextGeneration has not yet been called, -1 should be returned
private long generation = -1L;
- CloudSubscriber(String name, ConfigSource configSource, Set<ConfigKey<ConfigInstance>> keys) {
+ CloudSubscriber(ExecutorService executor, String name, ConfigSource configSource, Set<ConfigKey<ConfigInstance>> keys) {
this.name = name;
this.subscriber = new ConfigSubscriber(configSource);
- keys.forEach(k -> handles.put(k, subscriber.subscribe(k.getConfigClass(), k.getConfigId())));
+ Map<ConfigKey<ConfigInstance>, Future<ConfigHandle<ConfigInstance>>> futureHandles = new HashMap<>();
+ keys.forEach(k -> futureHandles.put(k, executor.submit(() -> subscriber.subscribe(k.getConfigClass(), k.getConfigId()))));
+ futureHandles.forEach((k, f) -> {
+ try {
+ handles.put(k, f.get());
+ } catch (InterruptedException | ExecutionException e) {}
+ });
}
@Override
diff --git a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java
index a6327b01e21..c215e4d82a0 100644
--- a/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java
+++ b/container-core/src/main/java/com/yahoo/container/di/CloudSubscriberFactory.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.di;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.ConfigInstance;
import com.yahoo.config.subscription.ConfigSource;
import com.yahoo.config.subscription.ConfigSourceSet;
@@ -15,6 +16,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* @author Tony Vaagenes
@@ -24,11 +29,25 @@ public class CloudSubscriberFactory implements SubscriberFactory {
private final ConfigSource configSource;
private final Map<CloudSubscriber, Integer> activeSubscribers = new WeakHashMap<>();
+ private final ExecutorService executor;
private Optional<Long> testGeneration = Optional.empty();
public CloudSubscriberFactory(ConfigSource configSource) {
this.configSource = configSource;
+ executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
+ 1, TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new DaemonThreadFactory("cloud-subscriber-factory"));
+ }
+
+ @Override
+ public void close() {
+ executor.shutdown();
+ try {
+ if ( ! executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) { }
}
@Override
@@ -39,7 +58,7 @@ public class CloudSubscriberFactory implements SubscriberFactory {
ConfigKey<ConfigInstance> invariant = (ConfigKey<ConfigInstance>) key;
subscriptionKeys.add(invariant);
}
- CloudSubscriber subscriber = new CloudSubscriber(name, configSource, subscriptionKeys);
+ CloudSubscriber subscriber = new CloudSubscriber(executor, name, configSource, subscriptionKeys);
testGeneration.ifPresent(subscriber.getSubscriber()::reload); // TODO: test specific code, remove
activeSubscribers.put(subscriber, 0);
diff --git a/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java b/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java
index f11f44413c7..bfb649fa82e 100644
--- a/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java
+++ b/container-core/src/main/java/com/yahoo/container/di/config/SubscriberFactory.java
@@ -16,5 +16,5 @@ public interface SubscriberFactory {
Subscriber getSubscriber(Set<? extends ConfigKey<?>> configKeys, String name);
void reloadActiveSubscribers(long generation);
-
+ void close();
}