aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHÃ¥kon Hallingstad <hakon@verizonmedia.com>2021-01-26 09:15:15 +0100
committerGitHub <noreply@github.com>2021-01-26 09:15:15 +0100
commitf7e4a1308740c563e86abb1b13d831b923cf4858 (patch)
tree75890bb1d714604c84e12d1846fe90df90f8e2b9
parent3e69b7fb65c409c590ddc7bbe40b59c7f4256699 (diff)
parent9e590d92051a0e63933fb1221d059d1284ab7f85 (diff)
Merge pull request #16223 from vespa-engine/revert-16214-revert-16208-hmusum/use-synchronized-backing-for-sessions
Reapply "Use synchronized backing for sessions"
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java65
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java7
2 files changed, 40 insertions, 32 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index ae8b424136f..b5ba5b769d9 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -57,12 +57,14 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
@@ -85,9 +87,9 @@ public class SessionRepository {
private static final long nonExistingActiveSessionId = 0;
private final Object monitor = new Object();
- private final Map<Long, LocalSession> localSessionCache = new ConcurrentHashMap<>();
- private final Map<Long, RemoteSession> remoteSessionCache = new ConcurrentHashMap<>();
- private final Map<Long, SessionStateWatcher> sessionStateWatchers = new HashMap<>();
+ private final Map<Long, LocalSession> localSessionCache = Collections.synchronizedMap(new HashMap<>());
+ private final Map<Long, RemoteSession> remoteSessionCache = Collections.synchronizedMap(new HashMap<>());
+ private final Map<Long, SessionStateWatcher> sessionStateWatchers = Collections.synchronizedMap(new HashMap<>());
private final Duration sessionLifetime;
private final Clock clock;
private final Curator curator;
@@ -168,12 +170,11 @@ public class SessionRepository {
// ---------------- Local sessions ----------------------------------------------------------------
- public synchronized void addLocalSession(LocalSession session) {
+ public void addLocalSession(LocalSession session) {
long sessionId = session.getSessionId();
localSessionCache.put(sessionId, session);
- if (remoteSessionCache.get(sessionId) == null) {
+ if (remoteSessionCache.get(sessionId) == null)
createRemoteSession(sessionId);
- }
}
public LocalSession getLocalSession(long sessionId) {
@@ -292,13 +293,13 @@ public class SessionRepository {
return getSessionList(curator.getChildren(sessionsPath));
}
- public synchronized RemoteSession createRemoteSession(long sessionId) {
+ public RemoteSession createRemoteSession(long sessionId) {
SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient);
- remoteSessionCache.put(sessionId, session);
- loadSessionIfActive(session);
- updateSessionStateWatcher(sessionId, session);
- return session;
+ RemoteSession newSession = loadSessionIfActive(session).orElse(session);
+ remoteSessionCache.put(sessionId, newSession);
+ updateSessionStateWatcher(sessionId, newSession);
+ return newSession;
}
public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) {
@@ -351,7 +352,7 @@ public class SessionRepository {
*
* @param sessionId session id for the new session
*/
- public synchronized void sessionAdded(long sessionId) {
+ public void sessionAdded(long sessionId) {
if (hasStatusDeleted(sessionId)) return;
log.log(Level.FINE, () -> "Adding remote session " + sessionId);
@@ -392,22 +393,16 @@ public class SessionRepository {
}
}
- private void sessionRemoved(long sessionId) {
- SessionStateWatcher watcher = sessionStateWatchers.remove(sessionId);
- if (watcher != null) watcher.close();
- remoteSessionCache.remove(sessionId);
- metricUpdater.incRemovedSessions();
- }
-
- private void loadSessionIfActive(RemoteSession session) {
+ private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) {
for (ApplicationId applicationId : applicationRepo.activeApplications()) {
if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) {
log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId());
log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
- return;
+ return Optional.ofNullable(remoteSessionCache.get(session.getSessionId()));
}
}
+ return Optional.empty();
}
void prepareRemoteSession(RemoteSession session) {
@@ -528,6 +523,7 @@ public class SessionRepository {
public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) {
log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'");
+ Set<LocalSession> toDelete = new HashSet<>();
try {
for (LocalSession candidate : localSessionCache.values()) {
Instant createTime = candidate.getCreateTime();
@@ -535,19 +531,22 @@ public class SessionRepository {
// Sessions with state other than ACTIVATE
if (hasExpired(candidate) && !isActiveSession(candidate)) {
- deleteLocalSession(candidate);
+ toDelete.add(candidate);
} else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) {
// Sessions with state ACTIVATE, but which are not actually active
Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId();
if (applicationId.isEmpty()) continue;
Long activeSession = activeSessions.get(applicationId.get());
if (activeSession == null || activeSession != candidate.getSessionId()) {
- deleteLocalSession(candidate);
+ toDelete.add(candidate);
log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " +
createTime + " for '" + applicationId + "'");
}
}
}
+
+ toDelete.forEach(this::deleteLocalSession);
+
// Make sure to catch here, to avoid executor just dying in case of issues ...
} catch (Throwable e) {
log.log(Level.WARNING, "Error when purging old sessions ", e);
@@ -556,7 +555,7 @@ public class SessionRepository {
}
private boolean hasExpired(LocalSession candidate) {
- return (candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant()));
+ return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant());
}
private boolean isActiveSession(LocalSession candidate) {
@@ -782,16 +781,22 @@ public class SessionRepository {
}
}
- private synchronized void sessionsChanged() throws NumberFormatException {
+ private void sessionsChanged() throws NumberFormatException {
List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData());
checkForRemovedSessions(sessions);
checkForAddedSessions(sessions);
}
- private void checkForRemovedSessions(List<Long> sessions) {
- for (Session session : remoteSessionCache.values())
- if ( ! sessions.contains(session.getSessionId()))
- sessionRemoved(session.getSessionId());
+ private void checkForRemovedSessions(List<Long> existingSessions) {
+ for (Iterator<RemoteSession> it = remoteSessionCache.values().iterator(); it.hasNext(); ) {
+ long sessionId = it.next().sessionId;
+ if (existingSessions.contains(sessionId)) continue;
+
+ SessionStateWatcher watcher = sessionStateWatchers.remove(sessionId);
+ if (watcher != null) watcher.close();
+ it.remove();
+ metricUpdater.incRemovedSessions();
+ }
}
private void checkForAddedSessions(List<Long> sessions) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
index 392bc373c19..b95cc068308 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
@@ -172,7 +172,7 @@ public class TenantRepository {
this.hostRegistry = hostRegistry;
this.configserverConfig = configserverConfig;
this.bootstrapExecutor = Executors.newFixedThreadPool(configserverConfig.numParallelTenantLoaders(),
- new DaemonThreadFactory("bootstrap tenants"));
+ new DaemonThreadFactory("bootstrap-tenant-"));
this.curator = curator;
this.metrics = metrics;
metricUpdater = metrics.getOrCreateMetricUpdater(Collections.emptyMap());
@@ -302,6 +302,8 @@ public class TenantRepository {
private Tenant createTenant(TenantName tenantName, Instant created) {
if (tenants.containsKey(tenantName)) return getTenant(tenantName);
+ Instant start = Instant.now();
+ log.log(Level.FINE, "Adding tenant '" + tenantName);
TenantApplications applicationRepo =
new TenantApplications(tenantName,
curator,
@@ -342,7 +344,8 @@ public class TenantRepository {
modelFactoryRegistry,
configDefinitionRepo,
tenantListener);
- log.log(Level.INFO, "Adding tenant '" + tenantName + "'" + ", created " + created);
+ log.log(Level.INFO, "Adding tenant '" + tenantName + "'" + ", created " + created +
+ ". Bootstrapping in " + Duration.between(start, Instant.now()));
Tenant tenant = new Tenant(tenantName, sessionRepository, applicationRepo, applicationRepo, created);
createAndWriteTenantMetaData(tenant);
tenants.putIfAbsent(tenantName, tenant);