diff options
author | HÃ¥kon Hallingstad <hakon@verizonmedia.com> | 2021-01-26 09:15:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-26 09:15:15 +0100 |
commit | f7e4a1308740c563e86abb1b13d831b923cf4858 (patch) | |
tree | 75890bb1d714604c84e12d1846fe90df90f8e2b9 | |
parent | 3e69b7fb65c409c590ddc7bbe40b59c7f4256699 (diff) | |
parent | 9e590d92051a0e63933fb1221d059d1284ab7f85 (diff) |
Merge pull request #16223 from vespa-engine/revert-16214-revert-16208-hmusum/use-synchronized-backing-for-sessions
Reapply "Use synchronized backing for sessions"
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java | 65 | ||||
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java | 7 |
2 files changed, 40 insertions, 32 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index ae8b424136f..b5ba5b769d9 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -57,12 +57,14 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.logging.Level; @@ -85,9 +87,9 @@ public class SessionRepository { private static final long nonExistingActiveSessionId = 0; private final Object monitor = new Object(); - private final Map<Long, LocalSession> localSessionCache = new ConcurrentHashMap<>(); - private final Map<Long, RemoteSession> remoteSessionCache = new ConcurrentHashMap<>(); - private final Map<Long, SessionStateWatcher> sessionStateWatchers = new HashMap<>(); + private final Map<Long, LocalSession> localSessionCache = Collections.synchronizedMap(new HashMap<>()); + private final Map<Long, RemoteSession> remoteSessionCache = Collections.synchronizedMap(new HashMap<>()); + private final Map<Long, SessionStateWatcher> sessionStateWatchers = Collections.synchronizedMap(new HashMap<>()); private final Duration sessionLifetime; private final Clock clock; private final Curator curator; @@ -168,12 +170,11 @@ public class SessionRepository { // ---------------- Local sessions ---------------------------------------------------------------- - public synchronized void addLocalSession(LocalSession session) { + public void addLocalSession(LocalSession session) { long sessionId = session.getSessionId(); localSessionCache.put(sessionId, session); - if (remoteSessionCache.get(sessionId) == null) { + if (remoteSessionCache.get(sessionId) == null) createRemoteSession(sessionId); - } } public LocalSession getLocalSession(long sessionId) { @@ -292,13 +293,13 @@ public class SessionRepository { return getSessionList(curator.getChildren(sessionsPath)); } - public synchronized RemoteSession createRemoteSession(long sessionId) { + public RemoteSession createRemoteSession(long sessionId) { SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); - remoteSessionCache.put(sessionId, session); - loadSessionIfActive(session); - updateSessionStateWatcher(sessionId, session); - return session; + RemoteSession newSession = loadSessionIfActive(session).orElse(session); + remoteSessionCache.put(sessionId, newSession); + updateSessionStateWatcher(sessionId, newSession); + return newSession; } public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { @@ -351,7 +352,7 @@ public class SessionRepository { * * @param sessionId session id for the new session */ - public synchronized void sessionAdded(long sessionId) { + public void sessionAdded(long sessionId) { if (hasStatusDeleted(sessionId)) return; log.log(Level.FINE, () -> "Adding remote session " + sessionId); @@ -392,22 +393,16 @@ public class SessionRepository { } } - private void sessionRemoved(long sessionId) { - SessionStateWatcher watcher = sessionStateWatchers.remove(sessionId); - if (watcher != null) watcher.close(); - remoteSessionCache.remove(sessionId); - metricUpdater.incRemovedSessions(); - } - - private void loadSessionIfActive(RemoteSession session) { + private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) { for (ApplicationId applicationId : applicationRepo.activeApplications()) { if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) { log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId()); log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); - return; + return Optional.ofNullable(remoteSessionCache.get(session.getSessionId())); } } + return Optional.empty(); } void prepareRemoteSession(RemoteSession session) { @@ -528,6 +523,7 @@ public class SessionRepository { public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) { log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'"); + Set<LocalSession> toDelete = new HashSet<>(); try { for (LocalSession candidate : localSessionCache.values()) { Instant createTime = candidate.getCreateTime(); @@ -535,19 +531,22 @@ public class SessionRepository { // Sessions with state other than ACTIVATE if (hasExpired(candidate) && !isActiveSession(candidate)) { - deleteLocalSession(candidate); + toDelete.add(candidate); } else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) { // Sessions with state ACTIVATE, but which are not actually active Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId(); if (applicationId.isEmpty()) continue; Long activeSession = activeSessions.get(applicationId.get()); if (activeSession == null || activeSession != candidate.getSessionId()) { - deleteLocalSession(candidate); + toDelete.add(candidate); log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " + createTime + " for '" + applicationId + "'"); } } } + + toDelete.forEach(this::deleteLocalSession); + // Make sure to catch here, to avoid executor just dying in case of issues ... } catch (Throwable e) { log.log(Level.WARNING, "Error when purging old sessions ", e); @@ -556,7 +555,7 @@ public class SessionRepository { } private boolean hasExpired(LocalSession candidate) { - return (candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant())); + return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant()); } private boolean isActiveSession(LocalSession candidate) { @@ -782,16 +781,22 @@ public class SessionRepository { } } - private synchronized void sessionsChanged() throws NumberFormatException { + private void sessionsChanged() throws NumberFormatException { List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData()); checkForRemovedSessions(sessions); checkForAddedSessions(sessions); } - private void checkForRemovedSessions(List<Long> sessions) { - for (Session session : remoteSessionCache.values()) - if ( ! sessions.contains(session.getSessionId())) - sessionRemoved(session.getSessionId()); + private void checkForRemovedSessions(List<Long> existingSessions) { + for (Iterator<RemoteSession> it = remoteSessionCache.values().iterator(); it.hasNext(); ) { + long sessionId = it.next().sessionId; + if (existingSessions.contains(sessionId)) continue; + + SessionStateWatcher watcher = sessionStateWatchers.remove(sessionId); + if (watcher != null) watcher.close(); + it.remove(); + metricUpdater.incRemovedSessions(); + } } private void checkForAddedSessions(List<Long> sessions) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index 392bc373c19..b95cc068308 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -172,7 +172,7 @@ public class TenantRepository { this.hostRegistry = hostRegistry; this.configserverConfig = configserverConfig; this.bootstrapExecutor = Executors.newFixedThreadPool(configserverConfig.numParallelTenantLoaders(), - new DaemonThreadFactory("bootstrap tenants")); + new DaemonThreadFactory("bootstrap-tenant-")); this.curator = curator; this.metrics = metrics; metricUpdater = metrics.getOrCreateMetricUpdater(Collections.emptyMap()); @@ -302,6 +302,8 @@ public class TenantRepository { private Tenant createTenant(TenantName tenantName, Instant created) { if (tenants.containsKey(tenantName)) return getTenant(tenantName); + Instant start = Instant.now(); + log.log(Level.FINE, "Adding tenant '" + tenantName); TenantApplications applicationRepo = new TenantApplications(tenantName, curator, @@ -342,7 +344,8 @@ public class TenantRepository { modelFactoryRegistry, configDefinitionRepo, tenantListener); - log.log(Level.INFO, "Adding tenant '" + tenantName + "'" + ", created " + created); + log.log(Level.INFO, "Adding tenant '" + tenantName + "'" + ", created " + created + + ". Bootstrapping in " + Duration.between(start, Instant.now())); Tenant tenant = new Tenant(tenantName, sessionRepository, applicationRepo, applicationRepo, created); createAndWriteTenantMetaData(tenant); tenants.putIfAbsent(tenantName, tenant); |