diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-01-26 12:06:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-26 12:06:55 +0100 |
commit | 0e07695b7756c9aa87914522a327f42974585624 (patch) | |
tree | cba8902998348314ffcdeca5ccfb7238f031bce6 /configserver | |
parent | 1dbeb6633446d5df0a00b4d8e4229f9eac9d4521 (diff) | |
parent | 3ac065e4c770f0b985f919a379a46b1b74decefb (diff) |
Merge pull request #16216 from vespa-engine/hmusum/load-sessions-in-parallel
Load sessions in parallel
Diffstat (limited to 'configserver')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index b5ba5b769d9..ca28b04264d 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.config.server.session; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.concurrent.StripedExecutor; import com.yahoo.config.FileReference; import com.yahoo.config.application.api.ApplicationPackage; @@ -42,6 +43,7 @@ import com.yahoo.vespa.config.server.zookeeper.SessionCounter; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.yolean.Exceptions; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -65,8 +67,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -164,8 +170,17 @@ public class SessionRepository { } private void loadSessions() { - loadLocalSessions(); - loadRemoteSessions(); + ExecutorService executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), + new DaemonThreadFactory("load-sessions-")); + loadLocalSessions(executor); + loadRemoteSessions(executor); + try { + executor.shutdown(); + if ( ! executor.awaitTermination(1, TimeUnit.MINUTES)) + log.log(Level.INFO, "Executor did not terminate"); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Shutdown of executor for loading sessions failed: " + Exceptions.toMessageString(e)); + } } // ---------------- Local sessions ---------------------------------------------------------------- @@ -185,18 +200,23 @@ public class SessionRepository { return localSessionCache.values(); } - private void loadLocalSessions() { + private void loadLocalSessions(ExecutorService executor) { File[] sessions = tenantFileSystemDirs.sessionsPath().listFiles(sessionApplicationsFilter); if (sessions == null) return; + Map<Long, Future<?>> futures = new HashMap<>(); for (File session : sessions) { + long sessionId = Long.parseLong(session.getName()); + futures.put(sessionId, executor.submit(() -> createSessionFromId(sessionId))); + } + futures.forEach((sessionId, future) -> { try { - createSessionFromId(Long.parseLong(session.getName())); - } catch (IllegalArgumentException e) { - log.log(Level.WARNING, "Could not load session '" + - session.getAbsolutePath() + "':" + e.getMessage() + ", skipping it."); + future.get(); + log.log(Level.INFO, () -> "Local session " + sessionId + " loaded"); + } catch (ExecutionException | InterruptedException e) { + log.log(Level.WARNING, "Could not load session " + sessionId, e); } - } + }); } public ConfigChangeActions prepareLocalSession(Session session, DeployLogger logger, PrepareParams params, Instant now) { @@ -343,8 +363,19 @@ public class SessionRepository { return children.stream().map(Long::parseLong).collect(Collectors.toList()); } - private void loadRemoteSessions() throws NumberFormatException { - getRemoteSessionsFromZooKeeper().forEach(this::sessionAdded); + private void loadRemoteSessions(ExecutorService executor) throws NumberFormatException { + Map<Long, Future<?>> futures = new HashMap<>(); + for (long sessionId : getRemoteSessionsFromZooKeeper()) { + futures.put(sessionId, executor.submit(() -> sessionAdded(sessionId))); + } + futures.forEach((sessionId, future) -> { + try { + future.get(); + log.log(Level.INFO, () -> "Remote session " + sessionId + " loaded"); + } catch (ExecutionException | InterruptedException e) { + log.log(Level.WARNING, "Could not load session " + sessionId, e); + } + }); } /** |