summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-01-26 12:06:55 +0100
committerGitHub <noreply@github.com>2021-01-26 12:06:55 +0100
commit0e07695b7756c9aa87914522a327f42974585624 (patch)
treecba8902998348314ffcdeca5ccfb7238f031bce6 /configserver
parent1dbeb6633446d5df0a00b4d8e4229f9eac9d4521 (diff)
parent3ac065e4c770f0b985f919a379a46b1b74decefb (diff)
Merge pull request #16216 from vespa-engine/hmusum/load-sessions-in-parallel
Load sessions in parallel
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java51
1 files changed, 41 insertions, 10 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index b5ba5b769d9..ca28b04264d 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.config.server.session;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.concurrent.StripedExecutor;
import com.yahoo.config.FileReference;
import com.yahoo.config.application.api.ApplicationPackage;
@@ -42,6 +43,7 @@ import com.yahoo.vespa.config.server.zookeeper.SessionCounter;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.vespa.flags.FlagSource;
+import com.yahoo.yolean.Exceptions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -65,8 +67,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -164,8 +170,17 @@ public class SessionRepository {
}
private void loadSessions() {
- loadLocalSessions();
- loadRemoteSessions();
+ ExecutorService executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
+ new DaemonThreadFactory("load-sessions-"));
+ loadLocalSessions(executor);
+ loadRemoteSessions(executor);
+ try {
+ executor.shutdown();
+ if ( ! executor.awaitTermination(1, TimeUnit.MINUTES))
+ log.log(Level.INFO, "Executor did not terminate");
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING, "Shutdown of executor for loading sessions failed: " + Exceptions.toMessageString(e));
+ }
}
// ---------------- Local sessions ----------------------------------------------------------------
@@ -185,18 +200,23 @@ public class SessionRepository {
return localSessionCache.values();
}
- private void loadLocalSessions() {
+ private void loadLocalSessions(ExecutorService executor) {
File[] sessions = tenantFileSystemDirs.sessionsPath().listFiles(sessionApplicationsFilter);
if (sessions == null) return;
+ Map<Long, Future<?>> futures = new HashMap<>();
for (File session : sessions) {
+ long sessionId = Long.parseLong(session.getName());
+ futures.put(sessionId, executor.submit(() -> createSessionFromId(sessionId)));
+ }
+ futures.forEach((sessionId, future) -> {
try {
- createSessionFromId(Long.parseLong(session.getName()));
- } catch (IllegalArgumentException e) {
- log.log(Level.WARNING, "Could not load session '" +
- session.getAbsolutePath() + "':" + e.getMessage() + ", skipping it.");
+ future.get();
+ log.log(Level.INFO, () -> "Local session " + sessionId + " loaded");
+ } catch (ExecutionException | InterruptedException e) {
+ log.log(Level.WARNING, "Could not load session " + sessionId, e);
}
- }
+ });
}
public ConfigChangeActions prepareLocalSession(Session session, DeployLogger logger, PrepareParams params, Instant now) {
@@ -343,8 +363,19 @@ public class SessionRepository {
return children.stream().map(Long::parseLong).collect(Collectors.toList());
}
- private void loadRemoteSessions() throws NumberFormatException {
- getRemoteSessionsFromZooKeeper().forEach(this::sessionAdded);
+ private void loadRemoteSessions(ExecutorService executor) throws NumberFormatException {
+ Map<Long, Future<?>> futures = new HashMap<>();
+ for (long sessionId : getRemoteSessionsFromZooKeeper()) {
+ futures.put(sessionId, executor.submit(() -> sessionAdded(sessionId)));
+ }
+ futures.forEach((sessionId, future) -> {
+ try {
+ future.get();
+ log.log(Level.INFO, () -> "Remote session " + sessionId + " loaded");
+ } catch (ExecutionException | InterruptedException e) {
+ log.log(Level.WARNING, "Could not load session " + sessionId, e);
+ }
+ });
}
/**