diff options
author | Harald Musum <musum@yahooinc.com> | 2022-07-11 09:58:29 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-07-11 09:58:29 +0200 |
commit | c672a1e9bd85be5cb4f5994400707dd11a659122 (patch) | |
tree | ab7593de0b118025a6b0d32f773cef629e429935 /configserver | |
parent | 30632c6985e6e88db26d2188fbdd68942461084d (diff) |
Use session id instead of Session in SessionStateWatcher
Diffstat (limited to 'configserver')
2 files changed, 36 insertions, 42 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 0d696ca9015..ecfee6f2b70 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -346,7 +346,7 @@ public class SessionRepository { RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); loadSessionIfActive(session); remoteSessionCache.put(sessionId, session); - updateSessionStateWatcher(sessionId, session); + updateSessionStateWatcher(sessionId); return session; } @@ -375,9 +375,11 @@ public class SessionRepository { return deleted; } - public void deactivateAndUpdateCache(RemoteSession remoteSession) { - RemoteSession session = remoteSession.deactivated(); - remoteSessionCache.put(session.getSessionId(), session); + public void deactivateAndUpdateCache(long sessionId) { + var s = remoteSessionCache.get(sessionId); + if (s == null) return; + + remoteSessionCache.put(sessionId, s.deactivated()); } public void deleteRemoteSessionFromZooKeeper(Session session) { @@ -439,10 +441,10 @@ public class SessionRepository { return session.getStatus() == Session.Status.DELETE; } - void activate(RemoteSession session) { - long sessionId = session.getSessionId(); - // Might need to create local session first + void activate(long sessionId) { createLocalSessionFromDistributedApplicationPackage(sessionId); + RemoteSession session = remoteSessionCache.get(sessionId); + if (session == null) return; CompletionWaiter waiter = createSessionZooKeeperClient(sessionId).getActiveWaiter(); log.log(Level.FINE, () -> session.logPre() + "Activating " + sessionId); @@ -464,11 +466,13 @@ public class SessionRepository { } } - void prepareRemoteSession(RemoteSession session) { + void prepareRemoteSession(long sessionId) { // Might need to create local session first - createLocalSessionFromDistributedApplicationPackage(session.getSessionId()); + createLocalSessionFromDistributedApplicationPackage(sessionId); + RemoteSession session = remoteSessionCache.get(sessionId); + if (session == null) return; - SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); + SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(sessionId); CompletionWaiter waiter = sessionZooKeeperClient.getPrepareWaiter(); ensureApplicationLoaded(session); notifyCompletion(waiter); @@ -485,7 +489,7 @@ public class SessionRepository { RemoteSession activated = session.activated(applicationSet); long sessionId = activated.getSessionId(); remoteSessionCache.put(sessionId, activated); - updateSessionStateWatcher(sessionId, activated); + updateSessionStateWatcher(sessionId); return applicationSet; } @@ -822,8 +826,9 @@ public class SessionRepository { } /** - * Returns a new local session for the given session id if it does not already exist. - * Will also add the session to the local session cache if necessary + * Create a new local session for the given session id if it does not already exist. + * Will also add the session to the local session cache if necessary. If there is no + * remote session matching the session it will also be created. */ public void createLocalSessionFromDistributedApplicationPackage(long sessionId) { if (applicationRepo.sessionExistsInFileSystem(sessionId)) { @@ -891,15 +896,12 @@ public class SessionRepository { return new TenantFileSystemDirs(configServerDB, tenantName).getUserApplicationDir(sessionId); } - private void updateSessionStateWatcher(long sessionId, RemoteSession remoteSession) { - SessionStateWatcher sessionStateWatcher = sessionStateWatchers.get(sessionId); - if (sessionStateWatcher == null) { - Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(sessionId).getAbsolute(), false); + private void updateSessionStateWatcher(long sessionId) { + sessionStateWatchers.computeIfAbsent(sessionId, (id) -> { + Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(id).getAbsolute(), false); fileCache.addListener(this::nodeChanged); - sessionStateWatchers.put(sessionId, new SessionStateWatcher(fileCache, remoteSession, metricUpdater, zkWatcherExecutor, this)); - } else { - sessionStateWatcher.updateRemoteSession(remoteSession); - } + return new SessionStateWatcher(fileCache, id, metricUpdater, zkWatcherExecutor, this); + }); } @Override diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java index e26d145d3d4..b0c87c48a32 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java @@ -13,7 +13,7 @@ import java.util.logging.Logger; import static com.yahoo.vespa.config.server.session.Session.Status; /** - * Watches one particular session (/config/v2/tenants/<tenantName>/sessions/<n>/sessionState in ZooKeeper) + * Watches session state for a session (/config/v2/tenants/<tenantName>/sessions/<n>/sessionState in ZooKeeper) * The session must be in the session repo. * * @author Vegard Havdal @@ -24,18 +24,18 @@ public class SessionStateWatcher { private static final Logger log = Logger.getLogger(SessionStateWatcher.class.getName()); private final Curator.FileCache fileCache; - private volatile RemoteSession session; + private final long sessionId; private final MetricUpdater metrics; private final Executor zkWatcherExecutor; private final SessionRepository sessionRepository; SessionStateWatcher(Curator.FileCache fileCache, - RemoteSession session, + long sessionId, MetricUpdater metrics, Executor zkWatcherExecutor, SessionRepository sessionRepository) { this.fileCache = fileCache; - this.session = session; + this.sessionId = sessionId; this.metrics = metrics; this.fileCache.addListener(this::nodeChanged); this.fileCache.start(); @@ -44,23 +44,21 @@ public class SessionStateWatcher { } private synchronized void sessionStatusChanged(Status newStatus) { - long sessionId = session.getSessionId(); - switch (newStatus) { case NEW: case UNKNOWN: break; case DELETE: - sessionRepository.deactivateAndUpdateCache(session); + sessionRepository.deactivateAndUpdateCache(sessionId); break; case PREPARE: - sessionRepository.prepareRemoteSession(session); + sessionRepository.prepareRemoteSession(sessionId); break; case ACTIVATE: - sessionRepository.activate(session); + sessionRepository.activate(sessionId); break; case DEACTIVATE: - sessionRepository.deactivateAndUpdateCache(session); + sessionRepository.deactivateAndUpdateCache(sessionId); break; default: throw new IllegalStateException("Unknown status " + newStatus); @@ -68,7 +66,7 @@ public class SessionStateWatcher { } public long getSessionId() { - return session.getSessionId(); + return sessionId; } public void close() { @@ -87,10 +85,9 @@ public class SessionStateWatcher { if (node != null) { newStatus = Status.parse(Utf8.toString(node.getData())); - String debugMessage = log.isLoggable(Level.FINE) ? - session.logPre() + "Session " + session.getSessionId() - + " changed status to " + newStatus.name() : - null; + String debugMessage = log.isLoggable(Level.FINE) + ? "Session " + sessionId + " changed status to " + newStatus.name() + : null; if (debugMessage != null) log.fine(debugMessage); sessionStatusChanged(newStatus); @@ -98,15 +95,10 @@ public class SessionStateWatcher { if (debugMessage != null) log.fine(debugMessage + ": Done"); } } catch (Exception e) { - log.log(Level.WARNING, session.logPre() + "Error handling session change to " + - newStatus.name() + " for session " + getSessionId(), e); + log.log(Level.WARNING, "Error handling session change to " + newStatus.name() + " for session " + getSessionId(), e); metrics.incSessionChangeErrors(); } }); } - public synchronized void updateRemoteSession(RemoteSession session) { - this.session = session; - } - } |