summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-06-08 23:17:37 +0200
committerHarald Musum <musum@verizonmedia.com>2020-06-08 23:17:37 +0200
commitb37e4e436dcf3675c02cd1a749c47f468f4191f0 (patch)
tree79cf37b3a845560ba922aeb25e71c074b7918beb /configserver
parent42eaf11c8b0996c9900960865c846c2aeaf3b4b9 (diff)
Move RemoteSessionRepo into SessionRepository
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java13
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java233
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java218
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java14
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java11
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java6
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java2
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java145
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java160
9 files changed, 357 insertions, 445 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
index 410dff31555..7adad8d5d55 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java
@@ -52,7 +52,6 @@ import com.yahoo.vespa.config.server.session.LocalSession;
import com.yahoo.vespa.config.server.session.SessionRepository;
import com.yahoo.vespa.config.server.session.PrepareParams;
import com.yahoo.vespa.config.server.session.RemoteSession;
-import com.yahoo.vespa.config.server.session.RemoteSessionRepo;
import com.yahoo.vespa.config.server.session.Session;
import com.yahoo.vespa.config.server.session.SessionFactory;
import com.yahoo.vespa.config.server.session.SilentDeployLogger;
@@ -492,7 +491,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
Tenant tenant = tenantRepository.getTenant(applicationId.tenant());
if (tenant == null) throw new NotFoundException("Tenant '" + applicationId.tenant() + "' not found");
long sessionId = getSessionIdForApplication(tenant, applicationId);
- RemoteSession session = tenant.getRemoteSessionRepo().getSession(sessionId);
+ RemoteSession session = tenant.getSessionRepo().getRemoteSession(sessionId);
if (session == null) throw new NotFoundException("Remote session " + sessionId + " not found");
return session.ensureApplicationLoaded().getForVersionOrLatest(version, clock.instant());
} catch (NotFoundException e) {
@@ -529,10 +528,10 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
}
private boolean localSessionHasBeenDeleted(ApplicationId applicationId, long sessionId, Duration waitTime) {
- RemoteSessionRepo remoteSessionRepo = tenantRepository.getTenant(applicationId.tenant()).getRemoteSessionRepo();
+ SessionRepository sessionRepository = tenantRepository.getTenant(applicationId.tenant()).getSessionRepo();
Instant end = Instant.now().plus(waitTime);
do {
- if (remoteSessionRepo.getSession(sessionId) == null) return true;
+ if (sessionRepository.getRemoteSession(sessionId) == null) return true;
try { Thread.sleep(10); } catch (InterruptedException e) { /* ignored */}
} while (Instant.now().isBefore(end));
@@ -690,7 +689,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) {
return tenantRepository.getAllTenants()
.stream()
- .map(tenant -> tenant.getRemoteSessionRepo().deleteExpiredSessions(clock, expiryTime))
+ .map(tenant -> tenant.getSessionRepo().deleteExpiredRemoteSessions(clock, expiryTime))
.mapToInt(i -> i)
.sum();
}
@@ -757,7 +756,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
}
private RemoteSession getRemoteSession(Tenant tenant, long sessionId) {
- RemoteSession session = tenant.getRemoteSessionRepo().getSession(sessionId);
+ RemoteSession session = tenant.getSessionRepo().getRemoteSession(sessionId);
if (session == null) throw new NotFoundException("Session " + sessionId + " was not found");
return session;
@@ -808,7 +807,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye
private RemoteSession getActiveSession(Tenant tenant, ApplicationId applicationId) {
TenantApplications applicationRepo = tenant.getApplicationRepo();
if (applicationRepo.activeApplications().contains(applicationId)) {
- return tenant.getRemoteSessionRepo().getSession(applicationRepo.requireActiveSessionOf(applicationId));
+ return tenant.getSessionRepo().getRemoteSession(applicationRepo.requireActiveSessionOf(applicationId));
}
return null;
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java
deleted file mode 100644
index 316f7f7778d..00000000000
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java
+++ /dev/null
@@ -1,233 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.server.session;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import com.yahoo.concurrent.StripedExecutor;
-import com.yahoo.config.provision.ApplicationId;
-import com.yahoo.config.provision.TenantName;
-
-import java.time.Clock;
-import java.util.logging.Level;
-import com.yahoo.path.Path;
-import com.yahoo.vespa.config.server.GlobalComponentRegistry;
-import com.yahoo.vespa.config.server.ReloadHandler;
-import com.yahoo.vespa.config.server.application.TenantApplications;
-import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
-import com.yahoo.vespa.config.server.monitoring.Metrics;
-import com.yahoo.vespa.config.server.tenant.TenantRepository;
-import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
-import com.yahoo.vespa.curator.Curator;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-
-/**
- * Session repository for RemoteSessions. There is one such repo per tenant.
- * Will watch/prepare sessions (applications) based on watched nodes in ZooKeeper. The zookeeper state watched in
- * this class is shared between all config servers, so it should not modify any global state, because the operation
- * will be performed on all servers. The repo can be regarded as read only from the POV of the configserver.
- *
- * @author Vegard Havdal
- * @author Ulf Lilleengen
- */
-public class RemoteSessionRepo {
-
- private static final Logger log = Logger.getLogger(RemoteSessionRepo.class.getName());
-
- private final Curator curator;
- private final Path sessionsPath;
- private final SessionFactory sessionFactory;
- private final Map<Long, RemoteSessionStateWatcher> sessionStateWatchers = new HashMap<>();
- private final ReloadHandler reloadHandler;
- private final TenantName tenantName;
- private final MetricUpdater metrics;
- private final Curator.DirectoryCache directoryCache;
- private final TenantApplications applicationRepo;
- private final Executor zkWatcherExecutor;
- private final SessionCache<RemoteSession> sessionCache;
-
- public RemoteSessionRepo(GlobalComponentRegistry componentRegistry,
- SessionFactory sessionFactory,
- ReloadHandler reloadHandler,
- TenantName tenantName,
- TenantApplications applicationRepo) {
- this.sessionCache = new SessionCache<>();
- this.curator = componentRegistry.getCurator();
- this.sessionsPath = TenantRepository.getSessionsPath(tenantName);
- this.applicationRepo = applicationRepo;
- this.sessionFactory = sessionFactory;
- this.reloadHandler = reloadHandler;
- this.tenantName = tenantName;
- this.metrics = componentRegistry.getMetrics().getOrCreateMetricUpdater(Metrics.createDimensions(tenantName));
- StripedExecutor<TenantName> zkWatcherExecutor = componentRegistry.getZkWatcherExecutor();
- this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenantName, command);
- initializeSessions();
- this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, componentRegistry.getZkCacheExecutor());
- this.directoryCache.addListener(this::childEvent);
- this.directoryCache.start();
- }
-
- public RemoteSession getSession(long sessionId) {
- return sessionCache.getSession(sessionId);
- }
-
- public List<Long> getSessions() {
- return getSessionList(curator.getChildren(sessionsPath));
- }
-
- public void addSession(RemoteSession session) {
- sessionCache.addSession(session);
- }
-
- public int deleteExpiredSessions(Clock clock, Duration expiryTime) {
- int deleted = 0;
- for (long sessionId : getSessions()) {
- RemoteSession session = sessionCache.getSession(sessionId);
- if (session == null) continue; // Internal sessions not in synch with zk, continue
- if (session.getStatus() == Session.Status.ACTIVATE) continue;
- if (sessionHasExpired(session.getCreateTime(), expiryTime, clock)) {
- log.log(Level.INFO, "Remote session " + sessionId + " for " + tenantName + " has expired, deleting it");
- session.delete();
- deleted++;
- }
- }
- return deleted;
- }
-
- private boolean sessionHasExpired(Instant created, Duration expiryTime, Clock clock) {
- return (created.plus(expiryTime).isBefore(clock.instant()));
- }
-
- private List<Long> getSessionListFromDirectoryCache(List<ChildData> children) {
- return getSessionList(children.stream()
- .map(child -> Path.fromString(child.getPath()).getName())
- .collect(Collectors.toList()));
- }
-
- private List<Long> getSessionList(List<String> children) {
- return children.stream().map(Long::parseLong).collect(Collectors.toList());
- }
-
- private void initializeSessions() throws NumberFormatException {
- getSessions().forEach(this::sessionAdded);
- }
-
- private synchronized void sessionsChanged() throws NumberFormatException {
- List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData());
- checkForRemovedSessions(sessions);
- checkForAddedSessions(sessions);
- }
-
- private void checkForRemovedSessions(List<Long> sessions) {
- for (RemoteSession session : sessionCache.getSessions())
- if ( ! sessions.contains(session.getSessionId()))
- sessionRemoved(session.getSessionId());
- }
-
- private void checkForAddedSessions(List<Long> sessions) {
- for (Long sessionId : sessions)
- if (sessionCache.getSession(sessionId) == null)
- sessionAdded(sessionId);
- }
-
- /**
- * A session for which we don't have a watcher, i.e. hitherto unknown to us.
- *
- * @param sessionId session id for the new session
- */
- private void sessionAdded(long sessionId) {
- log.log(Level.FINE, () -> "Adding session to RemoteSessionRepo: " + sessionId);
- RemoteSession session = sessionFactory.createRemoteSession(sessionId);
- Path sessionPath = sessionsPath.append(String.valueOf(sessionId));
- Curator.FileCache fileCache = curator.createFileCache(sessionPath.append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH).getAbsolute(), false);
- fileCache.addListener(this::nodeChanged);
- loadSessionIfActive(session);
- addSession(session);
- metrics.incAddedSessions();
- sessionStateWatchers.put(sessionId, new RemoteSessionStateWatcher(fileCache, reloadHandler, session, metrics, zkWatcherExecutor));
- }
-
- private void sessionRemoved(long sessionId) {
- RemoteSessionStateWatcher watcher = sessionStateWatchers.remove(sessionId);
- if (watcher != null) watcher.close();
- sessionCache.removeSession(sessionId);
- metrics.incRemovedSessions();
- }
-
- private void loadSessionIfActive(RemoteSession session) {
- for (ApplicationId applicationId : applicationRepo.activeApplications()) {
- if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) {
- log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
- reloadHandler.reloadConfig(session.ensureApplicationLoaded());
- log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
- return;
- }
- }
- }
-
- public synchronized void close() {
- try {
- if (directoryCache != null) {
- directoryCache.close();
- }
- } catch (Exception e) {
- log.log(Level.WARNING, "Exception when closing path cache", e);
- } finally {
- checkForRemovedSessions(new ArrayList<>());
- }
- }
-
- private void nodeChanged() {
- zkWatcherExecutor.execute(() -> {
- Multiset<Session.Status> sessionMetrics = HashMultiset.create();
- for (RemoteSession session : sessionCache.getSessions()) {
- sessionMetrics.add(session.getStatus());
- }
- metrics.setNewSessions(sessionMetrics.count(Session.Status.NEW));
- metrics.setPreparedSessions(sessionMetrics.count(Session.Status.PREPARE));
- metrics.setActivatedSessions(sessionMetrics.count(Session.Status.ACTIVATE));
- metrics.setDeactivatedSessions(sessionMetrics.count(Session.Status.DEACTIVATE));
- });
- }
-
- @SuppressWarnings("unused")
- private void childEvent(CuratorFramework ignored, PathChildrenCacheEvent event) {
- zkWatcherExecutor.execute(() -> {
- log.log(Level.FINE, () -> "Got child event: " + event);
- switch (event.getType()) {
- case CHILD_ADDED:
- sessionsChanged();
- synchronizeOnNew(getSessionListFromDirectoryCache(Collections.singletonList(event.getData())));
- break;
- case CHILD_REMOVED:
- sessionsChanged();
- break;
- case CONNECTION_RECONNECTED:
- sessionsChanged();
- break;
- }
- });
- }
-
- private void synchronizeOnNew(List<Long> sessionList) {
- for (long sessionId : sessionList) {
- RemoteSession session = sessionCache.getSession(sessionId);
- if (session == null) continue; // session might have been deleted after getting session list
- log.log(Level.FINE, () -> session.logPre() + "Confirming upload for session " + sessionId);
- session.confirmUpload();
- }
- }
-
-}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 2390c05d84a..12fdb6300b4 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -1,15 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.session;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.TenantName;
import com.yahoo.path.Path;
import com.yahoo.transaction.NestedTransaction;
import com.yahoo.vespa.config.server.GlobalComponentRegistry;
+import com.yahoo.vespa.config.server.ReloadHandler;
+import com.yahoo.vespa.config.server.application.TenantApplications;
import com.yahoo.vespa.config.server.deploy.TenantFileSystemDirs;
+import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
+import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.tenant.TenantRepository;
import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
import com.yahoo.vespa.curator.Curator;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import java.io.File;
import java.io.FilenameFilter;
@@ -17,15 +26,16 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
- *
* Session repository for config server. Stores session state in zookeeper and file system. There are two
* different session types (RemoteSession and LocalSession).
*
@@ -36,41 +46,65 @@ public class SessionRepository {
private static final Logger log = Logger.getLogger(SessionRepository.class.getName());
private static final FilenameFilter sessionApplicationsFilter = (dir, name) -> name.matches("\\d+");
- private final SessionCache<LocalSession> sessionCache;
- private final Map<Long, LocalSessionStateWatcher> sessionStateWatchers = new HashMap<>();
+ private final SessionCache<LocalSession> localSessionCache = new SessionCache<>();
+ private final SessionCache<RemoteSession> remoteSessionCache = new SessionCache<>();
+ private final Map<Long, LocalSessionStateWatcher> localSessionStateWatchers = new HashMap<>();
+ private final Map<Long, RemoteSessionStateWatcher> remoteSessionStateWatchers = new HashMap<>();
private final Duration sessionLifetime;
private final Clock clock;
private final Curator curator;
private final Executor zkWatcherExecutor;
private final TenantFileSystemDirs tenantFileSystemDirs;
+ private final ReloadHandler reloadHandler;
+ private final MetricUpdater metrics;
+ private final Curator.DirectoryCache directoryCache;
+ private final TenantApplications applicationRepo;
+ private final Path sessionsPath;
+ private final SessionFactory sessionFactory;
+ private final TenantName tenantName;
+
- public SessionRepository(TenantName tenantName, GlobalComponentRegistry componentRegistry, SessionFactory sessionFactory) {
- sessionCache = new SessionCache<>();
+ public SessionRepository(TenantName tenantName, GlobalComponentRegistry componentRegistry,
+ SessionFactory sessionFactory, TenantApplications applicationRepo,
+ ReloadHandler reloadHandler) {
+ this.tenantName = tenantName;
this.clock = componentRegistry.getClock();
this.curator = componentRegistry.getCurator();
this.sessionLifetime = Duration.ofSeconds(componentRegistry.getConfigserverConfig().sessionLifetime());
this.zkWatcherExecutor = command -> componentRegistry.getZkWatcherExecutor().execute(tenantName, command);
this.tenantFileSystemDirs = new TenantFileSystemDirs(componentRegistry.getConfigServerDB(), tenantName);
- loadSessions(sessionFactory);
+ this.sessionFactory = sessionFactory;
+ this.applicationRepo = applicationRepo;
+ loadLocalSessions(sessionFactory);
+
+ this.reloadHandler = reloadHandler;
+ this.sessionsPath = TenantRepository.getSessionsPath(tenantName);
+ this.metrics = componentRegistry.getMetrics().getOrCreateMetricUpdater(Metrics.createDimensions(tenantName));
+ initializeRemoteSessions();
+ this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, componentRegistry.getZkCacheExecutor());
+ this.directoryCache.addListener(this::childEvent);
+ this.directoryCache.start();
}
+ // ---------------- Local sessions ----------------------------------------------------------------
+
public synchronized void addSession(LocalSession session) {
- sessionCache.addSession(session);
+ localSessionCache.addSession(session);
Path sessionsPath = TenantRepository.getSessionsPath(session.getTenantName());
long sessionId = session.getSessionId();
Curator.FileCache fileCache = curator.createFileCache(sessionsPath.append(String.valueOf(sessionId)).append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH).getAbsolute(), false);
- sessionStateWatchers.put(sessionId, new LocalSessionStateWatcher(fileCache, session, this, zkWatcherExecutor));
+ localSessionStateWatchers.put(sessionId, new LocalSessionStateWatcher(fileCache, session, this, zkWatcherExecutor));
}
public LocalSession getSession(long sessionId) {
- return sessionCache.getSession(sessionId);
+ return localSessionCache.getSession(sessionId);
}
public List<LocalSession> getSessions() {
- return sessionCache.getSessions();
+ return localSessionCache.getSessions();
}
- private void loadSessions(SessionFactory sessionFactory) {
+ private void loadLocalSessions(SessionFactory sessionFactory) {
File[] sessions = tenantFileSystemDirs.sessionsPath().listFiles(sessionApplicationsFilter);
if (sessions == null) {
return;
@@ -80,7 +114,7 @@ public class SessionRepository {
addSession(sessionFactory.createSessionFromId(Long.parseLong(session.getName())));
} catch (IllegalArgumentException e) {
log.log(Level.WARNING, "Could not load session '" +
- session.getAbsolutePath() + "':" + e.getMessage() + ", skipping it.");
+ session.getAbsolutePath() + "':" + e.getMessage() + ", skipping it.");
}
}
}
@@ -88,7 +122,7 @@ public class SessionRepository {
public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) {
log.log(Level.FINE, "Purging old sessions");
try {
- for (LocalSession candidate : sessionCache.getSessions()) {
+ for (LocalSession candidate : localSessionCache.getSessions()) {
Instant createTime = candidate.getCreateTime();
log.log(Level.FINE, "Candidate session for deletion: " + candidate.getSessionId() + ", created: " + createTime);
@@ -124,9 +158,9 @@ public class SessionRepository {
public void deleteSession(LocalSession session) {
long sessionId = session.getSessionId();
log.log(Level.FINE, "Deleting local session " + sessionId);
- LocalSessionStateWatcher watcher = sessionStateWatchers.remove(sessionId);
- if (watcher != null) watcher.close();
- sessionCache.removeSession(sessionId);
+ LocalSessionStateWatcher watcher = localSessionStateWatchers.remove(sessionId);
+ if (watcher != null) watcher.close();
+ localSessionCache.removeSession(sessionId);
NestedTransaction transaction = new NestedTransaction();
session.delete(transaction);
transaction.commit();
@@ -135,15 +169,165 @@ public class SessionRepository {
public void close() {
deleteAllSessions();
tenantFileSystemDirs.delete();
+ try {
+ if (directoryCache != null) {
+ directoryCache.close();
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Exception when closing path cache", e);
+ } finally {
+ checkForRemovedSessions(new ArrayList<>());
+ }
}
private void deleteAllSessions() {
- List<LocalSession> sessions = new ArrayList<>(sessionCache.getSessions());
+ List<LocalSession> sessions = new ArrayList<>(localSessionCache.getSessions());
for (LocalSession session : sessions) {
deleteSession(session);
}
}
+ // ---------------- Remote sessions ----------------------------------------------------------------
+
+ public RemoteSession getRemoteSession(long sessionId) {
+ return remoteSessionCache.getSession(sessionId);
+ }
+
+ public List<Long> getRemoteSessions() {
+ return getRemoteSessionList(curator.getChildren(sessionsPath));
+ }
+
+ public void addRemoteSession(RemoteSession session) {
+ remoteSessionCache.addSession(session);
+ }
+
+ public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) {
+ int deleted = 0;
+ for (long sessionId : getRemoteSessions()) {
+ RemoteSession session = remoteSessionCache.getSession(sessionId);
+ if (session == null) continue; // Internal sessions not in synch with zk, continue
+ if (session.getStatus() == Session.Status.ACTIVATE) continue;
+ if (remoteSessionHasExpired(session.getCreateTime(), expiryTime, clock)) {
+ log.log(Level.INFO, "Remote session " + sessionId + " for " + tenantName + " has expired, deleting it");
+ session.delete();
+ deleted++;
+ }
+ }
+ return deleted;
+ }
+
+ private boolean remoteSessionHasExpired(Instant created, Duration expiryTime, Clock clock) {
+ return (created.plus(expiryTime).isBefore(clock.instant()));
+ }
+
+ private List<Long> getRemoteSessionListFromDirectoryCache(List<ChildData> children) {
+ return getRemoteSessionList(children.stream()
+ .map(child -> Path.fromString(child.getPath()).getName())
+ .collect(Collectors.toList()));
+ }
+
+ private List<Long> getRemoteSessionList(List<String> children) {
+ return children.stream().map(Long::parseLong).collect(Collectors.toList());
+ }
+
+ private void initializeRemoteSessions() throws NumberFormatException {
+ getRemoteSessions().forEach(this::sessionAdded);
+ }
+
+ private synchronized void remoteSessionsChanged() throws NumberFormatException {
+ List<Long> sessions = getRemoteSessionListFromDirectoryCache(directoryCache.getCurrentData());
+ checkForRemovedSessions(sessions);
+ checkForAddedSessions(sessions);
+ }
+
+ private void checkForRemovedSessions(List<Long> sessions) {
+ for (RemoteSession session : remoteSessionCache.getSessions())
+ if (!sessions.contains(session.getSessionId()))
+ sessionRemoved(session.getSessionId());
+ }
+
+ private void checkForAddedSessions(List<Long> sessions) {
+ for (Long sessionId : sessions)
+ if (remoteSessionCache.getSession(sessionId) == null)
+ sessionAdded(sessionId);
+ }
+
+ /**
+ * A session for which we don't have a watcher, i.e. hitherto unknown to us.
+ *
+ * @param sessionId session id for the new session
+ */
+ private void sessionAdded(long sessionId) {
+ log.log(Level.FINE, () -> "Adding session to RemoteSessionRepo: " + sessionId);
+ RemoteSession session = sessionFactory.createRemoteSession(sessionId);
+ Path sessionPath = sessionsPath.append(String.valueOf(sessionId));
+ Curator.FileCache fileCache = curator.createFileCache(sessionPath.append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH).getAbsolute(), false);
+ fileCache.addListener(this::nodeChanged);
+ loadSessionIfActive(session);
+ addRemoteSession(session);
+ metrics.incAddedSessions();
+ remoteSessionStateWatchers.put(sessionId, new RemoteSessionStateWatcher(fileCache, reloadHandler, session, metrics, zkWatcherExecutor));
+ }
+
+ private void sessionRemoved(long sessionId) {
+ RemoteSessionStateWatcher watcher = remoteSessionStateWatchers.remove(sessionId);
+ if (watcher != null) watcher.close();
+ remoteSessionCache.removeSession(sessionId);
+ metrics.incRemovedSessions();
+ }
+
+ private void loadSessionIfActive(RemoteSession session) {
+ for (ApplicationId applicationId : applicationRepo.activeApplications()) {
+ if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) {
+ log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
+ reloadHandler.reloadConfig(session.ensureApplicationLoaded());
+ log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
+ return;
+ }
+ }
+ }
+
+ private void nodeChanged() {
+ zkWatcherExecutor.execute(() -> {
+ Multiset<Session.Status> sessionMetrics = HashMultiset.create();
+ for (RemoteSession session : remoteSessionCache.getSessions()) {
+ sessionMetrics.add(session.getStatus());
+ }
+ metrics.setNewSessions(sessionMetrics.count(Session.Status.NEW));
+ metrics.setPreparedSessions(sessionMetrics.count(Session.Status.PREPARE));
+ metrics.setActivatedSessions(sessionMetrics.count(Session.Status.ACTIVATE));
+ metrics.setDeactivatedSessions(sessionMetrics.count(Session.Status.DEACTIVATE));
+ });
+ }
+
+ @SuppressWarnings("unused")
+ private void childEvent(CuratorFramework ignored, PathChildrenCacheEvent event) {
+ zkWatcherExecutor.execute(() -> {
+ log.log(Level.FINE, () -> "Got child event: " + event);
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ remoteSessionsChanged();
+ synchronizeOnNew(getRemoteSessionListFromDirectoryCache(Collections.singletonList(event.getData())));
+ break;
+ case CHILD_REMOVED:
+ remoteSessionsChanged();
+ break;
+ case CONNECTION_RECONNECTED:
+ remoteSessionsChanged();
+ break;
+ }
+ });
+ }
+
+ private void synchronizeOnNew(List<Long> sessionList) {
+ for (long sessionId : sessionList) {
+ RemoteSession session = remoteSessionCache.getSession(sessionId);
+ if (session == null) continue; // session might have been deleted after getting session list
+ log.log(Level.FINE, () -> session.logPre() + "Confirming upload for session " + sessionId);
+ session.confirmUpload();
+ }
+ }
+
@Override
public String toString() {
return getSessions().toString();
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java
index fb8af523bae..2f5af4bd6d5 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java
@@ -6,9 +6,8 @@ import com.yahoo.path.Path;
import com.yahoo.vespa.config.server.ReloadHandler;
import com.yahoo.vespa.config.server.RequestHandler;
import com.yahoo.vespa.config.server.application.TenantApplications;
-import com.yahoo.vespa.config.server.session.SessionRepository;
-import com.yahoo.vespa.config.server.session.RemoteSessionRepo;
import com.yahoo.vespa.config.server.session.SessionFactory;
+import com.yahoo.vespa.config.server.session.SessionRepository;
import com.yahoo.vespa.curator.Curator;
import org.apache.zookeeper.data.Stat;
@@ -28,7 +27,6 @@ public class Tenant implements TenantHandlerProvider {
static final String APPLICATIONS = "applications";
private final TenantName name;
- private final RemoteSessionRepo remoteSessionRepo;
private final Path path;
private final SessionFactory sessionFactory;
private final SessionRepository sessionRepository;
@@ -40,7 +38,6 @@ public class Tenant implements TenantHandlerProvider {
Tenant(TenantName name,
SessionFactory sessionFactory,
SessionRepository sessionRepository,
- RemoteSessionRepo remoteSessionRepo,
RequestHandler requestHandler,
ReloadHandler reloadHandler,
TenantApplications applicationRepo,
@@ -49,7 +46,6 @@ public class Tenant implements TenantHandlerProvider {
this.path = TenantRepository.getTenantPath(name);
this.requestHandler = requestHandler;
this.reloadHandler = reloadHandler;
- this.remoteSessionRepo = remoteSessionRepo;
this.sessionFactory = sessionFactory;
this.sessionRepository = sessionRepository;
this.applicationRepo = applicationRepo;
@@ -79,8 +75,8 @@ public class Tenant implements TenantHandlerProvider {
*
* @return repo
*/
- public RemoteSessionRepo getRemoteSessionRepo() {
- return remoteSessionRepo;
+ public SessionRepository getSessionRepo() {
+ return sessionRepository;
}
public TenantName getName() {
@@ -140,9 +136,9 @@ public class Tenant implements TenantHandlerProvider {
* Called by watchers as a reaction to {@link #delete()}.
*/
void close() {
- remoteSessionRepo.close(); // Closes watchers and clears memory.
+ sessionRepository.close(); // Closes watchers and clears memory.
applicationRepo.close(); // Closes watchers.
- sessionRepository.close(); // Closes watchers, clears memory, and deletes local files and ZK session state.
+ sessionRepository.close(); // Closes watchers, clears memory, and deletes local files and ZK session state.
}
/** Deletes the tenant tree from ZooKeeper (application and session status for the tenant) and triggers {@link #close()}. */
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
index fb5ad966eb0..6f758bb4d27 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
@@ -14,7 +14,6 @@ import com.yahoo.vespa.config.server.RequestHandler;
import com.yahoo.vespa.config.server.application.TenantApplications;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.config.server.session.SessionRepository;
-import com.yahoo.vespa.config.server.session.RemoteSessionRepo;
import com.yahoo.vespa.config.server.session.SessionFactory;
import com.yahoo.vespa.curator.Curator;
import org.apache.curator.framework.CuratorFramework;
@@ -222,14 +221,10 @@ public class TenantRepository {
if (reloadHandler == null)
reloadHandler = applicationRepo;
SessionFactory sessionFactory = new SessionFactory(componentRegistry, applicationRepo, applicationRepo, tenantName);
- SessionRepository sessionRepository = new SessionRepository(tenantName, componentRegistry, sessionFactory);
- RemoteSessionRepo remoteSessionRepo = new RemoteSessionRepo(componentRegistry,
- sessionFactory,
- reloadHandler,
- tenantName,
- applicationRepo);
+ SessionRepository sessionRepository = new SessionRepository(tenantName, componentRegistry, sessionFactory,
+ applicationRepo, reloadHandler);
log.log(Level.INFO, "Creating tenant '" + tenantName + "'");
- Tenant tenant = new Tenant(tenantName, sessionFactory, sessionRepository, remoteSessionRepo, requestHandler,
+ Tenant tenant = new Tenant(tenantName, sessionFactory, sessionRepository, requestHandler,
reloadHandler, applicationRepo, componentRegistry.getCurator());
notifyNewTenant(tenant);
tenants.putIfAbsent(tenantName, tenant);
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java
index 77ecf3f7fdd..ab75f64c92c 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java
@@ -298,14 +298,14 @@ public class ApplicationRepositoryTest {
LocalSession applicationData = tenant.getSessionRepository().getSession(sessionId);
assertNotNull(applicationData);
assertNotNull(applicationData.getApplicationId());
- assertNotNull(tenant.getRemoteSessionRepo().getSession(sessionId));
+ assertNotNull(tenant.getSessionRepo().getRemoteSession(sessionId));
assertNotNull(applicationRepository.getActiveSession(applicationId()));
// Delete app and verify that it has been deleted from repos and provisioner
assertTrue(applicationRepository.delete(applicationId()));
assertNull(applicationRepository.getActiveSession(applicationId()));
assertNull(tenant.getSessionRepository().getSession(sessionId));
- assertNull(tenant.getRemoteSessionRepo().getSession(sessionId));
+ assertNull(tenant.getSessionRepo().getRemoteSession(sessionId));
assertTrue(provisioner.removed);
assertEquals(tenant.getName(), provisioner.lastApplicationId.tenant());
assertEquals(applicationId(), provisioner.lastApplicationId);
@@ -346,7 +346,7 @@ public class ApplicationRepositoryTest {
RemoteSession activeSession = applicationRepository.getActiveSession(applicationId());
assertNull(activeSession);
Tenant tenant = tenantRepository.getTenant(applicationId().tenant());
- assertNull(tenant.getRemoteSessionRepo().getSession(prepareResult.sessionId()));
+ assertNull(tenant.getSessionRepo().getRemoteSession(prepareResult.sessionId()));
assertTrue(applicationRepository.delete(applicationId()));
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java
index 362b92a8d85..457acf8c376 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java
@@ -56,7 +56,7 @@ public class HostHandlerTest {
TestComponentRegistry componentRegistry = new TestComponentRegistry.Builder()
.modelFactoryRegistry(new ModelFactoryRegistry(Collections.singletonList(new VespaModelFactory(new NullConfigModelRegistry()))))
.build();
- tenant.getRemoteSessionRepo().addSession(new RemoteSession(tenant.getName(), sessionId, componentRegistry, new MockSessionZKClient(app)));
+ tenant.getSessionRepo().addRemoteSession(new RemoteSession(tenant.getName(), sessionId, componentRegistry, new MockSessionZKClient(app)));
}
@Before
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java
deleted file mode 100644
index 468dd5a15a7..00000000000
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.server.session;
-
-import com.yahoo.config.provision.TenantName;
-import com.yahoo.path.Path;
-import com.yahoo.text.Utf8;
-import com.yahoo.vespa.config.server.TestComponentRegistry;
-import com.yahoo.vespa.config.server.tenant.Tenant;
-import com.yahoo.vespa.config.server.tenant.TenantRepository;
-import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
-import com.yahoo.vespa.curator.Curator;
-import com.yahoo.vespa.curator.mock.MockCurator;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.function.LongPredicate;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-/**
- * @author Ulf Lilleengen
- */
-public class RemoteSessionRepoTest {
-
- private static final TenantName tenantName = TenantName.defaultName();
-
- private RemoteSessionRepo remoteSessionRepo;
- private Curator curator;
- TenantRepository tenantRepository;
-
- @Before
- public void setupFacade() {
- curator = new MockCurator();
- TestComponentRegistry componentRegistry = new TestComponentRegistry.Builder()
- .curator(curator)
- .build();
- tenantRepository = new TenantRepository(componentRegistry, false);
- tenantRepository.addTenant(tenantName);
- this.remoteSessionRepo = tenantRepository.getTenant(tenantName).getRemoteSessionRepo();
- curator.create(TenantRepository.getTenantPath(tenantName).append("/applications"));
- curator.create(TenantRepository.getSessionsPath(tenantName));
- createSession(1L, false);
- createSession(2L, false);
- }
-
- private void createSession(long sessionId, boolean wait) {
- createSession(sessionId, wait, tenantName);
- }
-
- private void createSession(long sessionId, boolean wait, TenantName tenantName) {
- Path sessionsPath = TenantRepository.getSessionsPath(tenantName);
- SessionZooKeeperClient zkc = new SessionZooKeeperClient(curator, sessionsPath.append(String.valueOf(sessionId)));
- zkc.createNewSession(Instant.now());
- if (wait) {
- Curator.CompletionWaiter waiter = zkc.getUploadWaiter();
- waiter.awaitCompletion(Duration.ofSeconds(120));
- }
- }
-
- @Test
- public void testInitialize() {
- assertSessionExists(1L);
- assertSessionExists(2L);
- }
-
- @Test
- public void testCreateSession() {
- createSession(3L, true);
- assertSessionExists(3L);
- }
-
- @Test
- public void testSessionStateChange() throws Exception {
- long sessionId = 3L;
- createSession(sessionId, true);
- assertSessionStatus(sessionId, Session.Status.NEW);
- assertStatusChange(sessionId, Session.Status.PREPARE);
- assertStatusChange(sessionId, Session.Status.ACTIVATE);
-
- Path session = TenantRepository.getSessionsPath(tenantName).append("" + sessionId);
- curator.delete(session);
- assertSessionRemoved(sessionId);
- assertNull(remoteSessionRepo.getSession(sessionId));
- }
-
- // If reading a session throws an exception it should be handled and not prevent other applications
- // from loading. In this test we just show that we end up with one session in remote session
- // repo even if it had bad data (by making getSessionIdForApplication() in FailingTenantApplications
- // throw an exception).
- @Test
- public void testBadApplicationRepoOnActivate() {
- long sessionId = 3L;
- TenantName mytenant = TenantName.from("mytenant");
- curator.set(TenantRepository.getApplicationsPath(mytenant).append("mytenant:appX:default"), new byte[0]); // Invalid data
- tenantRepository.addTenant(mytenant);
- Tenant tenant = tenantRepository.getTenant(mytenant);
- curator.create(TenantRepository.getSessionsPath(mytenant));
- remoteSessionRepo = tenant.getRemoteSessionRepo();
- assertThat(remoteSessionRepo.getSessions().size(), is(0));
- createSession(sessionId, true, mytenant);
- assertThat(remoteSessionRepo.getSessions().size(), is(1));
- }
-
- private void assertStatusChange(long sessionId, Session.Status status) throws Exception {
- Path statePath = TenantRepository.getSessionsPath(tenantName).append("" + sessionId).append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH);
- curator.create(statePath);
- curator.framework().setData().forPath(statePath.getAbsolute(), Utf8.toBytes(status.toString()));
- assertSessionStatus(sessionId, status);
- }
-
- private void assertSessionRemoved(long sessionId) {
- waitFor(p -> remoteSessionRepo.getSession(sessionId) == null, sessionId);
- assertNull(remoteSessionRepo.getSession(sessionId));
- }
-
- private void assertSessionExists(long sessionId) {
- assertSessionStatus(sessionId, Session.Status.NEW);
- }
-
- private void assertSessionStatus(long sessionId, Session.Status status) {
- waitFor(p -> remoteSessionRepo.getSession(sessionId) != null &&
- remoteSessionRepo.getSession(sessionId).getStatus() == status, sessionId);
- assertNotNull(remoteSessionRepo.getSession(sessionId));
- assertThat(remoteSessionRepo.getSession(sessionId).getStatus(), is(status));
- }
-
- private void waitFor(LongPredicate predicate, long sessionId) {
- long endTime = System.currentTimeMillis() + 60_000;
- boolean ok;
- do {
- ok = predicate.test(sessionId);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } while (System.currentTimeMillis() < endTime && !ok);
- }
-
-}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java
index 1efc0c0cd84..dbd71bed581 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java
@@ -5,11 +5,16 @@ import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.config.model.application.provider.FilesApplicationPackage;
import com.yahoo.config.provision.TenantName;
import com.yahoo.io.IOUtils;
+import com.yahoo.text.Utf8;
import com.yahoo.vespa.config.server.GlobalComponentRegistry;
import com.yahoo.vespa.config.server.TestComponentRegistry;
import com.yahoo.vespa.config.server.application.TenantApplications;
import com.yahoo.vespa.config.server.host.HostRegistry;
import com.yahoo.vespa.config.server.http.SessionHandlerTest;
+import com.yahoo.vespa.config.server.tenant.Tenant;
+import com.yahoo.vespa.config.server.tenant.TenantRepository;
+import com.yahoo.vespa.config.server.zookeeper.ConfigCurator;
+import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.mock.MockCurator;
import org.junit.Before;
import org.junit.Rule;
@@ -19,9 +24,14 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.function.LongPredicate;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
@@ -30,7 +40,9 @@ import static org.junit.Assert.fail;
public class SessionRepositoryTest {
private File testApp = new File("src/test/apps/app");
- private SessionRepository repo;
+ private MockCurator curator;
+ private SessionRepository sessionRepository;
+ private TenantRepository tenantRepository;
private static final TenantName tenantName = TenantName.defaultName();
@Rule
@@ -49,44 +61,50 @@ public class SessionRepositoryTest {
IOUtils.copyDirectory(testApp, sessionsPath.resolve("2").toFile());
IOUtils.copyDirectory(testApp, sessionsPath.resolve("3").toFile());
}
+ curator = new MockCurator();
+ curator.create(TenantRepository.getTenantPath(tenantName).append("/applications"));
+ curator.create(TenantRepository.getSessionsPath(tenantName));
GlobalComponentRegistry globalComponentRegistry = new TestComponentRegistry.Builder()
- .curator(new MockCurator())
+ .curator(curator)
.configServerConfig(new ConfigserverConfig.Builder()
.configServerDBDir(configserverDbDir.getAbsolutePath())
.configDefinitionsDir(temporaryFolder.newFolder().getAbsolutePath())
.sessionLifetime(5)
.build())
.build();
+ tenantRepository = new TenantRepository(globalComponentRegistry, false);
+ TenantApplications applicationRepo = TenantApplications.create(globalComponentRegistry, tenantName);
SessionFactory sessionFactory = new SessionFactory(globalComponentRegistry,
- TenantApplications.create(globalComponentRegistry, tenantName),
+ applicationRepo,
new HostRegistry<>(),
tenantName);
- repo = new SessionRepository(tenantName, globalComponentRegistry, sessionFactory);
+ sessionRepository = new SessionRepository(tenantName, globalComponentRegistry, sessionFactory,
+ applicationRepo, applicationRepo);
}
@Test
public void require_that_sessions_can_be_loaded_from_disk() {
- assertNotNull(repo.getSession(1L));
- assertNotNull(repo.getSession(2L));
- assertNotNull(repo.getSession(3L));
- assertNull(repo.getSession(4L));
+ assertNotNull(sessionRepository.getSession(1L));
+ assertNotNull(sessionRepository.getSession(2L));
+ assertNotNull(sessionRepository.getSession(3L));
+ assertNull(sessionRepository.getSession(4L));
}
@Test
public void require_that_all_sessions_are_deleted() {
- repo.close();
- assertNull(repo.getSession(1L));
- assertNull(repo.getSession(2L));
- assertNull(repo.getSession(3L));
+ sessionRepository.close();
+ assertNull(sessionRepository.getSession(1L));
+ assertNull(sessionRepository.getSession(2L));
+ assertNull(sessionRepository.getSession(3L));
}
@Test
public void require_that_sessions_belong_to_a_tenant() {
// tenant is "default"
- assertNotNull(repo.getSession(1L));
- assertNotNull(repo.getSession(2L));
- assertNotNull(repo.getSession(3L));
- assertNull(repo.getSession(4L));
+ assertNotNull(sessionRepository.getSession(1L));
+ assertNotNull(sessionRepository.getSession(2L));
+ assertNotNull(sessionRepository.getSession(3L));
+ assertNull(sessionRepository.getSession(4L));
// tenant is "newTenant"
try {
@@ -94,12 +112,110 @@ public class SessionRepositoryTest {
} catch (Exception e) {
fail();
}
- assertNull(repo.getSession(1L));
+ assertNull(sessionRepository.getSession(1L));
- repo.addSession(new SessionHandlerTest.MockLocalSession(1L, FilesApplicationPackage.fromFile(testApp)));
- repo.addSession(new SessionHandlerTest.MockLocalSession(2L, FilesApplicationPackage.fromFile(testApp)));
- assertNotNull(repo.getSession(1L));
- assertNotNull(repo.getSession(2L));
- assertNull(repo.getSession(3L));
+ sessionRepository.addSession(new SessionHandlerTest.MockLocalSession(1L, FilesApplicationPackage.fromFile(testApp)));
+ sessionRepository.addSession(new SessionHandlerTest.MockLocalSession(2L, FilesApplicationPackage.fromFile(testApp)));
+ assertNotNull(sessionRepository.getSession(1L));
+ assertNotNull(sessionRepository.getSession(2L));
+ assertNull(sessionRepository.getSession(3L));
}
+
+
+ private void createSession(long sessionId, boolean wait) {
+ createSession(sessionId, wait, tenantName);
+ }
+
+ private void createSession(long sessionId, boolean wait, TenantName tenantName) {
+ com.yahoo.path.Path sessionsPath = TenantRepository.getSessionsPath(tenantName);
+ SessionZooKeeperClient zkc = new SessionZooKeeperClient(curator, sessionsPath.append(String.valueOf(sessionId)));
+ zkc.createNewSession(Instant.now());
+ if (wait) {
+ Curator.CompletionWaiter waiter = zkc.getUploadWaiter();
+ waiter.awaitCompletion(Duration.ofSeconds(120));
+ }
+ }
+
+ @Test
+ public void testInitialize() {
+ createSession(10L, false);
+ createSession(11L, false);
+ assertSessionExists(10L);
+ assertSessionExists(11L);
+ }
+
+ @Test
+ public void testCreateSession() {
+ createSession(12L, true);
+ assertSessionExists(12L);
+ }
+
+ @Test
+ public void testSessionStateChange() throws Exception {
+ long sessionId = 3L;
+ createSession(sessionId, true);
+ assertSessionStatus(sessionId, Session.Status.NEW);
+ assertStatusChange(sessionId, Session.Status.PREPARE);
+ assertStatusChange(sessionId, Session.Status.ACTIVATE);
+
+ com.yahoo.path.Path session = TenantRepository.getSessionsPath(tenantName).append("" + sessionId);
+ curator.delete(session);
+ assertSessionRemoved(sessionId);
+ assertNull(sessionRepository.getRemoteSession(sessionId));
+ }
+
+ // If reading a session throws an exception it should be handled and not prevent other applications
+ // from loading. In this test we just show that we end up with one session in remote session
+ // repo even if it had bad data (by making getSessionIdForApplication() in FailingTenantApplications
+ // throw an exception).
+ @Test
+ public void testBadApplicationRepoOnActivate() {
+ long sessionId = 3L;
+ TenantName mytenant = TenantName.from("mytenant");
+ curator.set(TenantRepository.getApplicationsPath(mytenant).append("mytenant:appX:default"), new byte[0]); // Invalid data
+ tenantRepository.addTenant(mytenant);
+ Tenant tenant = tenantRepository.getTenant(mytenant);
+ curator.create(TenantRepository.getSessionsPath(mytenant));
+ sessionRepository = tenant.getSessionRepo();
+ assertThat(sessionRepository.getRemoteSessions().size(), is(0));
+ createSession(sessionId, true, mytenant);
+ assertThat(sessionRepository.getRemoteSessions().size(), is(1));
+ }
+
+ private void assertStatusChange(long sessionId, Session.Status status) throws Exception {
+ com.yahoo.path.Path statePath = TenantRepository.getSessionsPath(tenantName).append("" + sessionId).append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH);
+ curator.create(statePath);
+ curator.framework().setData().forPath(statePath.getAbsolute(), Utf8.toBytes(status.toString()));
+ assertSessionStatus(sessionId, status);
+ }
+
+ private void assertSessionRemoved(long sessionId) {
+ waitFor(p -> sessionRepository.getRemoteSession(sessionId) == null, sessionId);
+ assertNull(sessionRepository.getRemoteSession(sessionId));
+ }
+
+ private void assertSessionExists(long sessionId) {
+ assertSessionStatus(sessionId, Session.Status.NEW);
+ }
+
+ private void assertSessionStatus(long sessionId, Session.Status status) {
+ waitFor(p -> sessionRepository.getRemoteSession(sessionId) != null &&
+ sessionRepository.getRemoteSession(sessionId).getStatus() == status, sessionId);
+ assertNotNull(sessionRepository.getRemoteSession(sessionId));
+ assertThat(sessionRepository.getRemoteSession(sessionId).getStatus(), is(status));
+ }
+
+ private void waitFor(LongPredicate predicate, long sessionId) {
+ long endTime = System.currentTimeMillis() + 60_000;
+ boolean ok;
+ do {
+ ok = predicate.test(sessionId);
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } while (System.currentTimeMillis() < endTime && !ok);
+ }
+
}