diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-01-25 17:49:31 +0100 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2021-01-25 17:49:31 +0100 |
commit | 7d0bf449b7ba1dac2a90def09d03b2b2b2e28f9f (patch) | |
tree | 1c33bc85bd8338eb6683b407adebf8e15448fd3a | |
parent | b2d8ce1a2da83e8fc011a16189ee9e1b4a6587ae (diff) |
Use synchronized collections for cached and watchers
Avoid locking on instance level by using synchronized collections
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index ae8b424136f..ecd05e6e1fe 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -57,12 +57,14 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.logging.Level; @@ -85,9 +87,9 @@ public class SessionRepository { private static final long nonExistingActiveSessionId = 0; private final Object monitor = new Object(); - private final Map<Long, LocalSession> localSessionCache = new ConcurrentHashMap<>(); - private final Map<Long, RemoteSession> remoteSessionCache = new ConcurrentHashMap<>(); - private final Map<Long, SessionStateWatcher> sessionStateWatchers = new HashMap<>(); + private final Map<Long, LocalSession> localSessionCache = Collections.synchronizedMap(new HashMap<>()); + private final Map<Long, RemoteSession> remoteSessionCache = Collections.synchronizedMap(new HashMap<>()); + private final Map<Long, SessionStateWatcher> sessionStateWatchers = Collections.synchronizedMap(new HashMap<>()); private final Duration sessionLifetime; private final Clock clock; private final Curator curator; @@ -168,12 +170,10 @@ public class SessionRepository { // ---------------- Local sessions ---------------------------------------------------------------- - public synchronized void addLocalSession(LocalSession session) { + public void addLocalSession(LocalSession session) { long sessionId = session.getSessionId(); localSessionCache.put(sessionId, session); - if (remoteSessionCache.get(sessionId) == null) { - createRemoteSession(sessionId); - } + remoteSessionCache.putIfAbsent(sessionId, createRemoteSession(sessionId)); } public LocalSession getLocalSession(long sessionId) { @@ -292,7 +292,7 @@ public class SessionRepository { return getSessionList(curator.getChildren(sessionsPath)); } - public synchronized RemoteSession createRemoteSession(long sessionId) { + public RemoteSession createRemoteSession(long sessionId) { SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); remoteSessionCache.put(sessionId, session); @@ -351,7 +351,7 @@ public class SessionRepository { * * @param sessionId session id for the new session */ - public synchronized void sessionAdded(long sessionId) { + public void sessionAdded(long sessionId) { if (hasStatusDeleted(sessionId)) return; log.log(Level.FINE, () -> "Adding remote session " + sessionId); @@ -392,13 +392,6 @@ public class SessionRepository { } } - private void sessionRemoved(long sessionId) { - SessionStateWatcher watcher = sessionStateWatchers.remove(sessionId); - if (watcher != null) watcher.close(); - remoteSessionCache.remove(sessionId); - metricUpdater.incRemovedSessions(); - } - private void loadSessionIfActive(RemoteSession session) { for (ApplicationId applicationId : applicationRepo.activeApplications()) { if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) { @@ -528,6 +521,7 @@ public class SessionRepository { public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) { log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'"); + Set<LocalSession> toDelete = new HashSet<>(); try { for (LocalSession candidate : localSessionCache.values()) { Instant createTime = candidate.getCreateTime(); @@ -535,19 +529,22 @@ public class SessionRepository { // Sessions with state other than ACTIVATE if (hasExpired(candidate) && !isActiveSession(candidate)) { - deleteLocalSession(candidate); + toDelete.add(candidate); } else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) { // Sessions with state ACTIVATE, but which are not actually active Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId(); if (applicationId.isEmpty()) continue; Long activeSession = activeSessions.get(applicationId.get()); if (activeSession == null || activeSession != candidate.getSessionId()) { - deleteLocalSession(candidate); + toDelete.add(candidate); log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " + createTime + " for '" + applicationId + "'"); } } } + + toDelete.forEach(this::deleteLocalSession); + // Make sure to catch here, to avoid executor just dying in case of issues ... } catch (Throwable e) { log.log(Level.WARNING, "Error when purging old sessions ", e); @@ -556,7 +553,7 @@ public class SessionRepository { } private boolean hasExpired(LocalSession candidate) { - return (candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant())); + return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant()); } private boolean isActiveSession(LocalSession candidate) { @@ -782,16 +779,22 @@ public class SessionRepository { } } - private synchronized void sessionsChanged() throws NumberFormatException { + private void sessionsChanged() throws NumberFormatException { List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData()); checkForRemovedSessions(sessions); checkForAddedSessions(sessions); } - private void checkForRemovedSessions(List<Long> sessions) { - for (Session session : remoteSessionCache.values()) - if ( ! sessions.contains(session.getSessionId())) - sessionRemoved(session.getSessionId()); + private void checkForRemovedSessions(List<Long> existingSessions) { + for (Iterator<RemoteSession> it = remoteSessionCache.values().iterator(); it.hasNext(); ) { + long sessionId = it.next().sessionId; + if (existingSessions.contains(sessionId)) continue; + + SessionStateWatcher watcher = sessionStateWatchers.remove(sessionId); + if (watcher != null) watcher.close(); + it.remove(); + metricUpdater.incRemovedSessions(); + } } private void checkForAddedSessions(List<Long> sessions) { |