aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHÃ¥kon Hallingstad <hakon.hallingstad@gmail.com>2022-07-11 12:39:44 +0200
committerGitHub <noreply@github.com>2022-07-11 12:39:44 +0200
commitfddbd1c759821594990c3b67abd1a82d40463d49 (patch)
tree39f3781f06e010d7c03c4bbfda3dae5bc38ed6cd
parent54281752ea492a263b865192784a47adeab67718 (diff)
parentc1e1c83ccfc68fb73facbba975877d4b58ac5cc6 (diff)
Merge pull request #23446 from vespa-engine/hmusum/use-session-id
Use session id instead of Session in SessionStateWatcher [run-systemtest]
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java59
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java42
2 files changed, 46 insertions, 55 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index d803488cb0a..ecfee6f2b70 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -344,10 +344,10 @@ public class SessionRepository {
public RemoteSession createRemoteSession(long sessionId) {
SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient);
- RemoteSession newSession = loadSessionIfActive(session).orElse(session);
- remoteSessionCache.put(sessionId, newSession);
- updateSessionStateWatcher(sessionId, newSession);
- return newSession;
+ loadSessionIfActive(session);
+ remoteSessionCache.put(sessionId, session);
+ updateSessionStateWatcher(sessionId);
+ return session;
}
public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) {
@@ -375,9 +375,11 @@ public class SessionRepository {
return deleted;
}
- public void deactivateAndUpdateCache(RemoteSession remoteSession) {
- RemoteSession session = remoteSession.deactivated();
- remoteSessionCache.put(session.getSessionId(), session);
+ public void deactivateAndUpdateCache(long sessionId) {
+ var s = remoteSessionCache.get(sessionId);
+ if (s == null) return;
+
+ remoteSessionCache.put(sessionId, s.deactivated());
}
public void deleteRemoteSessionFromZooKeeper(Session session) {
@@ -388,7 +390,7 @@ public class SessionRepository {
}
private boolean sessionHasExpired(Instant created, Duration expiryTime, Clock clock) {
- return (created.plus(expiryTime).isBefore(clock.instant()));
+ return created.plus(expiryTime).isBefore(clock.instant());
}
private List<Long> getSessionListFromDirectoryCache(List<ChildData> children) {
@@ -439,8 +441,11 @@ public class SessionRepository {
return session.getStatus() == Session.Status.DELETE;
}
- void activate(RemoteSession session) {
- long sessionId = session.getSessionId();
+ void activate(long sessionId) {
+ createLocalSessionFromDistributedApplicationPackage(sessionId);
+ RemoteSession session = remoteSessionCache.get(sessionId);
+ if (session == null) return;
+
CompletionWaiter waiter = createSessionZooKeeperClient(sessionId).getActiveWaiter();
log.log(Level.FINE, () -> session.logPre() + "Activating " + sessionId);
applicationRepo.activateApplication(ensureApplicationLoaded(session), sessionId);
@@ -449,21 +454,25 @@ public class SessionRepository {
log.log(Level.INFO, session.logPre() + "Session activated: " + sessionId);
}
- private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) {
+ private void loadSessionIfActive(RemoteSession session) {
for (ApplicationId applicationId : applicationRepo.activeApplications()) {
Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId);
if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) {
log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId());
log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
- return Optional.ofNullable(remoteSessionCache.get(session.getSessionId()));
+ return;
}
}
- return Optional.empty();
}
- void prepareRemoteSession(RemoteSession session) {
- SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId());
+ void prepareRemoteSession(long sessionId) {
+ // Might need to create local session first
+ createLocalSessionFromDistributedApplicationPackage(sessionId);
+ RemoteSession session = remoteSessionCache.get(sessionId);
+ if (session == null) return;
+
+ SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(sessionId);
CompletionWaiter waiter = sessionZooKeeperClient.getPrepareWaiter();
ensureApplicationLoaded(session);
notifyCompletion(waiter);
@@ -480,7 +489,7 @@ public class SessionRepository {
RemoteSession activated = session.activated(applicationSet);
long sessionId = activated.getSessionId();
remoteSessionCache.put(sessionId, activated);
- updateSessionStateWatcher(sessionId, activated);
+ updateSessionStateWatcher(sessionId);
return applicationSet;
}
@@ -817,8 +826,9 @@ public class SessionRepository {
}
/**
- * Returns a new local session for the given session id if it does not already exist.
- * Will also add the session to the local session cache if necessary
+ * Create a new local session for the given session id if it does not already exist.
+ * Will also add the session to the local session cache if necessary. If there is no
+ * remote session matching the session it will also be created.
*/
public void createLocalSessionFromDistributedApplicationPackage(long sessionId) {
if (applicationRepo.sessionExistsInFileSystem(sessionId)) {
@@ -886,15 +896,12 @@ public class SessionRepository {
return new TenantFileSystemDirs(configServerDB, tenantName).getUserApplicationDir(sessionId);
}
- private void updateSessionStateWatcher(long sessionId, RemoteSession remoteSession) {
- SessionStateWatcher sessionStateWatcher = sessionStateWatchers.get(sessionId);
- if (sessionStateWatcher == null) {
- Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(sessionId).getAbsolute(), false);
+ private void updateSessionStateWatcher(long sessionId) {
+ sessionStateWatchers.computeIfAbsent(sessionId, (id) -> {
+ Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(id).getAbsolute(), false);
fileCache.addListener(this::nodeChanged);
- sessionStateWatchers.put(sessionId, new SessionStateWatcher(fileCache, remoteSession, metricUpdater, zkWatcherExecutor, this));
- } else {
- sessionStateWatcher.updateRemoteSession(remoteSession);
- }
+ return new SessionStateWatcher(fileCache, id, metricUpdater, zkWatcherExecutor, this);
+ });
}
@Override
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java
index 6c6f60426fe..abd2266fcf3 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java
@@ -13,7 +13,7 @@ import java.util.logging.Logger;
import static com.yahoo.vespa.config.server.session.Session.Status;
/**
- * Watches one particular session (/config/v2/tenants/&lt;tenantName&gt;/sessions/&lt;n&gt;/sessionState in ZooKeeper)
+ * Watches session state for a session (/config/v2/tenants/&lt;tenantName&gt;/sessions/&lt;n&gt;/sessionState in ZooKeeper)
* The session must be in the session repo.
*
* @author Vegard Havdal
@@ -24,18 +24,18 @@ public class SessionStateWatcher {
private static final Logger log = Logger.getLogger(SessionStateWatcher.class.getName());
private final Curator.FileCache fileCache;
- private volatile RemoteSession session;
+ private final long sessionId;
private final MetricUpdater metrics;
private final Executor zkWatcherExecutor;
private final SessionRepository sessionRepository;
SessionStateWatcher(Curator.FileCache fileCache,
- RemoteSession session,
+ long sessionId,
MetricUpdater metrics,
Executor zkWatcherExecutor,
SessionRepository sessionRepository) {
this.fileCache = fileCache;
- this.session = session;
+ this.sessionId = sessionId;
this.metrics = metrics;
this.fileCache.addListener(this::nodeChanged);
this.fileCache.start();
@@ -44,37 +44,27 @@ public class SessionStateWatcher {
}
private synchronized void sessionStatusChanged(Status newStatus) {
- long sessionId = session.getSessionId();
-
switch (newStatus) {
case NEW:
case UNKNOWN:
break;
case DELETE:
- sessionRepository.deactivateAndUpdateCache(session);
+ case DEACTIVATE:
+ sessionRepository.deactivateAndUpdateCache(sessionId);
break;
case PREPARE:
- createLocalSession(sessionId);
- sessionRepository.prepareRemoteSession(session);
+ sessionRepository.prepareRemoteSession(sessionId);
break;
case ACTIVATE:
- createLocalSession(sessionId);
- sessionRepository.activate(session);
- break;
- case DEACTIVATE:
- sessionRepository.deactivateAndUpdateCache(session);
+ sessionRepository.activate(sessionId);
break;
default:
throw new IllegalStateException("Unknown status " + newStatus);
}
}
- private void createLocalSession(long sessionId) {
- sessionRepository.createLocalSessionFromDistributedApplicationPackage(sessionId);
- }
-
public long getSessionId() {
- return session.getSessionId();
+ return sessionId;
}
public void close() {
@@ -93,10 +83,9 @@ public class SessionStateWatcher {
if (node != null) {
newStatus = Status.parse(Utf8.toString(node.getData()));
- String debugMessage = log.isLoggable(Level.FINE) ?
- session.logPre() + "Session " + session.getSessionId()
- + " changed status to " + newStatus.name() :
- null;
+ String debugMessage = log.isLoggable(Level.FINE)
+ ? "Session " + sessionId + " changed status to " + newStatus.name()
+ : null;
if (debugMessage != null) log.fine(debugMessage);
sessionStatusChanged(newStatus);
@@ -104,15 +93,10 @@ public class SessionStateWatcher {
if (debugMessage != null) log.fine(debugMessage + ": Done");
}
} catch (Exception e) {
- log.log(Level.WARNING, session.logPre() + "Error handling session change to " +
- newStatus.name() + " for session " + getSessionId(), e);
+ log.log(Level.WARNING, "Error handling session change to " + newStatus.name() + " for session " + getSessionId(), e);
metrics.incSessionChangeErrors();
}
});
}
- public synchronized void updateRemoteSession(RemoteSession session) {
- this.session = session;
- }
-
}