summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--client/go/cmd/logfmt/cmd.go2
-rw-r--r--container-core/abi-spec.json45
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java17
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java17
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java7
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java114
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java58
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java41
-rw-r--r--container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def8
-rw-r--r--container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java3
-rw-r--r--container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java20
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java13
-rw-r--r--document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java49
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java2
-rw-r--r--searchlib/CMakeLists.txt1
-rw-r--r--searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp8
-rw-r--r--searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt9
-rw-r--r--searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp121
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/tensor/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h2
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp288
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.h84
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt9
-rw-r--r--vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp34
-rw-r--r--vespalib/src/tests/datastore/datastore/datastore_test.cpp18
-rw-r--r--vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp20
-rw-r--r--vespalib/src/vespa/vespalib/datastore/buffer_stats.h12
-rw-r--r--vespalib/src/vespa/vespalib/datastore/bufferstate.cpp33
-rw-r--r--vespalib/src/vespa/vespalib/datastore/bufferstate.h29
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastore.h12
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastore.hpp42
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.cpp54
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.h72
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java353
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java920
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java2
40 files changed, 1780 insertions, 752 deletions
diff --git a/client/go/cmd/logfmt/cmd.go b/client/go/cmd/logfmt/cmd.go
index b549c5fa7ef..84322c7ff08 100644
--- a/client/go/cmd/logfmt/cmd.go
+++ b/client/go/cmd/logfmt/cmd.go
@@ -38,7 +38,7 @@ and converts it to something human-readable`,
cmd.Flags().StringVarP(&curOptions.OnlyHostname, "host", "H", "", "select only one host")
cmd.Flags().StringVarP(&curOptions.OnlyPid, "pid", "p", "", "select only one process ID")
cmd.Flags().StringVarP(&curOptions.OnlyService, "service", "S", "", "select only one service")
- cmd.Flags().VarP(&curOptions.Format, "format", "F", "select logfmt output format, vespa (default), json or raw are supported")
+ cmd.Flags().VarP(&curOptions.Format, "format", "F", "select logfmt output format, vespa (default), json or raw are supported. The json output format is not stable, and will change in the future.")
cmd.Flags().MarkHidden("tc")
cmd.Flags().MarkHidden("ts")
cmd.Flags().MarkHidden("dequotenewlines")
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json
index 5985e79b786..d324656abf2 100644
--- a/container-core/abi-spec.json
+++ b/container-core/abi-spec.json
@@ -1051,8 +1051,6 @@
"public com.yahoo.jdisc.http.ConnectorConfig$Builder healthCheckProxy(java.util.function.Consumer)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder proxyProtocol(com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder proxyProtocol(java.util.function.Consumer)",
- "public com.yahoo.jdisc.http.ConnectorConfig$Builder secureRedirect(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)",
- "public com.yahoo.jdisc.http.ConnectorConfig$Builder secureRedirect(java.util.function.Consumer)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder maxRequestsPerConnection(int)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder maxConnectionLife(double)",
"public com.yahoo.jdisc.http.ConnectorConfig$Builder http2Enabled(boolean)",
@@ -1074,7 +1072,6 @@
"public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer$Builder tlsClientAuthEnforcer",
"public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder healthCheckProxy",
"public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder proxyProtocol",
- "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder secureRedirect",
"public com.yahoo.jdisc.http.ConnectorConfig$Http2$Builder http2",
"public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder serverName"
]
@@ -1193,37 +1190,6 @@
],
"fields": []
},
- "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder": {
- "superClass": "java.lang.Object",
- "interfaces": [
- "com.yahoo.config.ConfigBuilder"
- ],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>()",
- "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect)",
- "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder enabled(boolean)",
- "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder port(int)",
- "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect build()"
- ],
- "fields": []
- },
- "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect": {
- "superClass": "com.yahoo.config.InnerNode",
- "interfaces": [],
- "attributes": [
- "public",
- "final"
- ],
- "methods": [
- "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)",
- "public boolean enabled()",
- "public int port()"
- ],
- "fields": []
- },
"com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder": {
"superClass": "java.lang.Object",
"interfaces": [
@@ -1236,9 +1202,13 @@
"public void <init>()",
"public void <init>(com.yahoo.jdisc.http.ConnectorConfig$ServerName)",
"public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder fallback(java.lang.String)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder allowed(java.lang.String)",
+ "public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder allowed(java.util.Collection)",
"public com.yahoo.jdisc.http.ConnectorConfig$ServerName build()"
],
- "fields": []
+ "fields": [
+ "public java.util.List allowed"
+ ]
},
"com.yahoo.jdisc.http.ConnectorConfig$ServerName": {
"superClass": "com.yahoo.config.InnerNode",
@@ -1249,7 +1219,9 @@
],
"methods": [
"public void <init>(com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder)",
- "public java.lang.String fallback()"
+ "public java.lang.String fallback()",
+ "public java.util.List allowed()",
+ "public java.lang.String allowed(int)"
],
"fields": []
},
@@ -1443,7 +1415,6 @@
"public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer tlsClientAuthEnforcer()",
"public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy healthCheckProxy()",
"public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol proxyProtocol()",
- "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect secureRedirect()",
"public int maxRequestsPerConnection()",
"public double maxConnectionLife()",
"public boolean http2Enabled()",
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java
index bf278981b69..6282e334409 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java
@@ -9,7 +9,6 @@ import com.yahoo.jdisc.http.ssl.impl.DefaultConnectorSsl;
import com.yahoo.security.tls.MixedMode;
import com.yahoo.security.tls.TransportSecurityUtils;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
-import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
@@ -57,7 +56,6 @@ public class ConnectorFactory {
// e.g. due to TLS configuration through environment variables.
private static void runtimeConnectorConfigValidation(ConnectorConfig config) {
validateProxyProtocolConfiguration(config);
- validateSecureRedirectConfig(config);
}
private static void validateProxyProtocolConfiguration(ConnectorConfig config) {
@@ -70,28 +68,15 @@ public class ConnectorFactory {
}
}
- private static void validateSecureRedirectConfig(ConnectorConfig config) {
- if (config.secureRedirect().enabled() && isSslEffectivelyEnabled(config)) {
- throw new IllegalArgumentException("Secure redirect can only be enabled on connectors without HTTPS");
- }
- }
-
public ConnectorConfig getConnectorConfig() {
return connectorConfig;
}
public ServerConnector createConnector(final Metric metric, final Server server, JettyConnectionLogger connectionLogger,
ConnectionMetricAggregator connectionMetricAggregator) {
- ServerConnector connector = new JDiscServerConnector(
+ return new JDiscServerConnector(
connectorConfig, metric, server, connectionLogger, connectionMetricAggregator,
createConnectionFactories(metric).toArray(ConnectionFactory[]::new));
- connector.setPort(connectorConfig.listenPort());
- connector.setName(connectorConfig.name());
- connector.setAcceptQueueSize(connectorConfig.acceptQueueSize());
- connector.setReuseAddress(connectorConfig.reuseAddress());
- connector.setIdleTimeout(toMillis(connectorConfig.idleTimeout()));
- connector.addBean(HttpCompliance.RFC7230);
- return connector;
}
private List<ConnectionFactory> createConnectionFactories(Metric metric) {
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java
index 22c5b2ebfdb..3fb81cb5352 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java
@@ -3,6 +3,7 @@ package com.yahoo.jdisc.http.server.jetty;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.http.HttpRequest;
+import com.yahoo.jdisc.http.ServerConfig;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.AsyncContextEvent;
@@ -22,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -52,13 +52,20 @@ class HttpResponseStatisticsCollector extends HandlerWrapper implements Graceful
private final AtomicReference<FutureCallback> shutdown = new AtomicReference<>();
private final List<String> monitoringHandlerPaths;
private final List<String> searchHandlerPaths;
+ private final Set<String> ignoredUserAgents;
private final AtomicLong inFlight = new AtomicLong();
private final ConcurrentMap<StatusCodeMetric, LongAdder> statistics = new ConcurrentHashMap<>();
- HttpResponseStatisticsCollector(List<String> monitoringHandlerPaths, List<String> searchHandlerPaths) {
+ HttpResponseStatisticsCollector(ServerConfig.Metric cfg) {
+ this(cfg.monitoringHandlerPaths(), cfg.searchHandlerPaths(), cfg.ignoredUserAgents());
+ }
+
+ HttpResponseStatisticsCollector(List<String> monitoringHandlerPaths, List<String> searchHandlerPaths,
+ Collection<String> ignoredUserAgents) {
this.monitoringHandlerPaths = monitoringHandlerPaths;
this.searchHandlerPaths = searchHandlerPaths;
+ this.ignoredUserAgents = Set.copyOf(ignoredUserAgents);
}
private final AsyncListener completionWatcher = new AsyncListener() {
@@ -108,12 +115,6 @@ class HttpResponseStatisticsCollector extends HandlerWrapper implements Graceful
}
}
- void ignoreUserAgent(String agentName) {
- ignoredUserAgents.add(agentName);
- }
-
- private Set<String> ignoredUserAgents = new HashSet<>();
-
private boolean shouldLogMetricsFor(Request request) {
String agent = request.getHeader(HttpHeader.USER_AGENT.toString());
if (agent == null) return true;
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java
index 79cdb8f67cf..49db22c3e38 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java
@@ -3,6 +3,7 @@ package com.yahoo.jdisc.http.server.jetty;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.http.ConnectorConfig;
+import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Server;
@@ -50,6 +51,12 @@ class JDiscServerConnector extends ServerConnector {
}
addBean(connectionLogger);
addBean(connectionMetricAggregator);
+ setPort(config.listenPort());
+ setName(config.name());
+ setAcceptQueueSize(config.acceptQueueSize());
+ setReuseAddress(config.reuseAddress());
+ setIdleTimeout((long) (config.idleTimeout() * 1000));
+ addBean(HttpCompliance.RFC7230);
}
@Override
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java
index 96c5bac335b..965575f8b30 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java
@@ -14,11 +14,13 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.jmx.ConnectorServer;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHttpOutputInterceptor;
@@ -83,20 +85,13 @@ public class JettyHttpServer extends AbstractServerProvider {
listenedPorts.add(connectorConfig.listenPort());
}
- JDiscContext jDiscContext = new JDiscContext(filterBindings,
- container,
- janitor,
- metric,
- serverConfig);
+ JDiscContext jDiscContext = new JDiscContext(filterBindings, container, janitor, metric, serverConfig);
ServletHolder jdiscServlet = new ServletHolder(new JDiscHttpServlet(jDiscContext));
List<JDiscServerConnector> connectors = Arrays.stream(server.getConnectors())
.map(JDiscServerConnector.class::cast)
.collect(toList());
-
- server.setHandler(getHandlerCollection(serverConfig,
- connectors,
- jdiscServlet));
+ server.setHandler(createRootHandler(serverConfig, connectors, jdiscServlet));
this.metricsReporter = new ServerMetricReporter(metric, server);
}
@@ -136,46 +131,36 @@ public class JettyHttpServer extends AbstractServerProvider {
}
}
- private HandlerCollection getHandlerCollection(ServerConfig serverConfig,
- List<JDiscServerConnector> connectors,
- ServletHolder jdiscServlet) {
- ServletContextHandler servletContextHandler = createServletContextHandler();
- servletContextHandler.addServlet(jdiscServlet, "/*");
-
- List<ConnectorConfig> connectorConfigs = connectors.stream().map(JDiscServerConnector::connectorConfig).collect(toList());
- var secureRedirectHandler = new SecuredRedirectHandler(connectorConfigs);
- secureRedirectHandler.setHandler(servletContextHandler);
-
- var proxyHandler = new HealthCheckProxyHandler(connectors);
- proxyHandler.setHandler(secureRedirectHandler);
-
- var authEnforcer = new TlsClientAuthenticationEnforcer(connectorConfigs);
- authEnforcer.setHandler(proxyHandler);
-
- GzipHandler gzipHandler = newGzipHandler(serverConfig);
- gzipHandler.setHandler(authEnforcer);
-
- HttpResponseStatisticsCollector statisticsCollector =
- new HttpResponseStatisticsCollector(serverConfig.metric().monitoringHandlerPaths(),
- serverConfig.metric().searchHandlerPaths());
- statisticsCollector.setHandler(gzipHandler);
- for (String agent : serverConfig.metric().ignoredUserAgents()) {
- statisticsCollector.ignoreUserAgent(agent);
+ private HandlerCollection createRootHandler(
+ ServerConfig serverCfg, List<JDiscServerConnector> connectors, ServletHolder jdiscServlet) {
+ List<ContextHandler> perConnectorHandlers = new ArrayList<>();
+ for (JDiscServerConnector connector : connectors) {
+ ConnectorConfig connectorCfg = connector.connectorConfig();
+ List<HandlerWrapper> chain = new ArrayList<>();
+ chain.add(newGenericStatisticsHandler());
+ chain.add(newResponseStatisticsHandler(serverCfg));
+ chain.add(newGzipHandler(serverCfg));
+ if (connectorCfg.tlsClientAuthEnforcer().enable()) {
+ chain.add(newTlsClientAuthEnforcerHandler(connectorCfg));
+ }
+ if (connectorCfg.healthCheckProxy().enable()) {
+ chain.add(newHealthCheckProxyHandler(connectors));
+ } else {
+ chain.add(newServletHandler(jdiscServlet));
+ }
+ ContextHandler connectorRoot = newConnectorContextHandler(connector, connectorCfg);
+ addChainToRoot(connectorRoot, chain);
+ perConnectorHandlers.add(connectorRoot);
}
-
- StatisticsHandler statisticsHandler = newStatisticsHandler();
- statisticsHandler.setHandler(statisticsCollector);
-
- HandlerCollection handlerCollection = new HandlerCollection();
- handlerCollection.setHandlers(new Handler[] { statisticsHandler });
- return handlerCollection;
+ return new ContextHandlerCollection(perConnectorHandlers.toArray(new ContextHandler[0]));
}
- private ServletContextHandler createServletContextHandler() {
- ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
- servletContextHandler.setContextPath("/");
- servletContextHandler.setDisplayName(getDisplayName(listenedPorts));
- return servletContextHandler;
+ private static void addChainToRoot(ContextHandler root, List<HandlerWrapper> chain) {
+ HandlerWrapper parent = root;
+ for (HandlerWrapper h : chain) {
+ parent.setHandler(h);
+ parent = h;
+ }
}
private static String getDisplayName(List<Integer> ports) {
@@ -237,13 +222,44 @@ public class JettyHttpServer extends AbstractServerProvider {
Server server() { return server; }
- private StatisticsHandler newStatisticsHandler() {
+ private ServletContextHandler newServletHandler(ServletHolder servlet) {
+ var h = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
+ h.setContextPath("/");
+ h.setDisplayName(getDisplayName(listenedPorts));
+ h.addServlet(servlet, "/*");
+ return h;
+ }
+
+ private static ContextHandler newConnectorContextHandler(JDiscServerConnector connector, ConnectorConfig connectorCfg) {
+ ContextHandler ctxHandler = new ContextHandler();
+ List<String> allowedServerNames = connectorCfg.serverName().allowed();
+ if (allowedServerNames.isEmpty()) {
+ ctxHandler.setVirtualHosts(new String[]{"@%s".formatted(connector.getName())});
+ } else {
+ ctxHandler.setVirtualHosts(allowedServerNames.toArray(new String[0]));
+ }
+ return ctxHandler;
+ }
+
+ private static HealthCheckProxyHandler newHealthCheckProxyHandler(List<JDiscServerConnector> connectors) {
+ return new HealthCheckProxyHandler(connectors);
+ }
+
+ private static TlsClientAuthenticationEnforcer newTlsClientAuthEnforcerHandler(ConnectorConfig cfg) {
+ return new TlsClientAuthenticationEnforcer(cfg.tlsClientAuthEnforcer());
+ }
+
+ private static HttpResponseStatisticsCollector newResponseStatisticsHandler(ServerConfig cfg) {
+ return new HttpResponseStatisticsCollector(cfg.metric());
+ }
+
+ private static StatisticsHandler newGenericStatisticsHandler() {
StatisticsHandler statisticsHandler = new StatisticsHandler();
statisticsHandler.statsReset();
return statisticsHandler;
}
- private GzipHandler newGzipHandler(ServerConfig serverConfig) {
+ private static GzipHandler newGzipHandler(ServerConfig serverConfig) {
GzipHandler gzipHandler = new GzipHandlerWithVaryHeaderFixed();
gzipHandler.setCompressionLevel(serverConfig.responseCompressionLevel());
gzipHandler.setInflateBufferSize(8 * 1024);
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java
deleted file mode 100644
index e5dddf285ef..00000000000
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.jdisc.http.server.jetty;
-
-import com.yahoo.jdisc.http.ConnectorConfig;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.HandlerWrapper;
-import org.eclipse.jetty.util.URIUtil;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.yahoo.jdisc.http.server.jetty.RequestUtils.getConnectorLocalPort;
-
-/**
- * A secure redirect handler inspired by {@link org.eclipse.jetty.server.handler.SecuredRedirectHandler}.
- *
- * @author bjorncs
- */
-class SecuredRedirectHandler extends HandlerWrapper {
-
- private static final String HEALTH_CHECK_PATH = "/status.html";
-
- private final Map<Integer, Integer> redirectMap;
-
- SecuredRedirectHandler(List<ConnectorConfig> connectorConfigs) {
- this.redirectMap = createRedirectMap(connectorConfigs);
- }
-
- @Override
- public void handle(String target, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException {
- int localPort = getConnectorLocalPort(request);
- if (!redirectMap.containsKey(localPort)) {
- _handler.handle(target, request, servletRequest, servletResponse);
- return;
- }
- servletResponse.setContentLength(0);
- if (!servletRequest.getRequestURI().equals(HEALTH_CHECK_PATH)) {
- servletResponse.sendRedirect(
- URIUtil.newURI("https", request.getServerName(), redirectMap.get(localPort), request.getRequestURI(), request.getQueryString()));
- }
- request.setHandled(true);
- }
-
- private static Map<Integer, Integer> createRedirectMap(List<ConnectorConfig> connectorConfigs) {
- var redirectMap = new HashMap<Integer, Integer>();
- for (ConnectorConfig connectorConfig : connectorConfigs) {
- if (connectorConfig.secureRedirect().enabled()) {
- redirectMap.put(connectorConfig.listenPort(), connectorConfig.secureRedirect().port());
- }
- }
- return redirectMap;
- }
-}
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java
index ce949074bfa..b420aabc598 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java
@@ -11,11 +11,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.yahoo.jdisc.http.server.jetty.RequestUtils.getConnectorLocalPort;
/**
* A Jetty handler that enforces TLS client authentication with configurable white list.
@@ -24,10 +19,11 @@ import static com.yahoo.jdisc.http.server.jetty.RequestUtils.getConnectorLocalPo
*/
class TlsClientAuthenticationEnforcer extends HandlerWrapper {
- private final Map<Integer, List<String>> portToWhitelistedPathsMapping;
+ private final ConnectorConfig.TlsClientAuthEnforcer cfg;
- TlsClientAuthenticationEnforcer(List<ConnectorConfig> connectorConfigs) {
- portToWhitelistedPathsMapping = createWhitelistMapping(connectorConfigs);
+ TlsClientAuthenticationEnforcer(ConnectorConfig.TlsClientAuthEnforcer cfg) {
+ if (!cfg.enable()) throw new IllegalArgumentException();
+ this.cfg = cfg;
}
@Override
@@ -44,36 +40,11 @@ class TlsClientAuthenticationEnforcer extends HandlerWrapper {
}
}
- private static Map<Integer, List<String>> createWhitelistMapping(List<ConnectorConfig> connectorConfigs) {
- var mapping = new HashMap<Integer, List<String>>();
- for (ConnectorConfig connectorConfig : connectorConfigs) {
- var enforcerConfig = connectorConfig.tlsClientAuthEnforcer();
- if (enforcerConfig.enable()) {
- mapping.put(connectorConfig.listenPort(), enforcerConfig.pathWhitelist());
- }
- }
- return mapping;
- }
-
- private boolean isRequest(Request request) {
- return request.getDispatcherType() == DispatcherType.REQUEST;
- }
+ private boolean isRequest(Request request) { return request.getDispatcherType() == DispatcherType.REQUEST; }
private boolean isRequestToWhitelistedBinding(Request jettyRequest) {
- int localPort = getConnectorLocalPort(jettyRequest);
- List<String> whiteListedPaths = getWhitelistedPathsForPort(localPort);
- if (whiteListedPaths == null) {
- return true; // enforcer not enabled
- }
// Note: Same path definition as HttpRequestFactory.getUri()
- return whiteListedPaths.contains(jettyRequest.getRequestURI());
- }
-
- private List<String> getWhitelistedPathsForPort(int localPort) {
- if (portToWhitelistedPathsMapping.containsKey(0) && portToWhitelistedPathsMapping.size() == 1) {
- return portToWhitelistedPathsMapping.get(0); // for unit tests which uses 0 for listen port
- }
- return portToWhitelistedPathsMapping.get(localPort);
+ return cfg.pathWhitelist().contains(jettyRequest.getRequestURI());
}
private boolean isClientAuthenticated(HttpServletRequest servletRequest) {
diff --git a/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def b/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def
index 1f4763d32a7..ecbc451ead1 100644
--- a/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def
+++ b/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def
@@ -116,12 +116,6 @@ proxyProtocol.enabled bool default=false
# Allow https in parallel with proxy protocol
proxyProtocol.mixedMode bool default=false
-# Redirect all requests to https port
-secureRedirect.enabled bool default=false
-
-# Target port for redirect
-secureRedirect.port int default=443
-
# Maximum number of request per connection before server marks connections as non-persistent. Set to '0' to disable.
maxRequestsPerConnection int default=0
@@ -138,3 +132,5 @@ http2.maxConcurrentStreams int default=4096
# Override the default server name when authority is missing from request.
serverName.fallback string default=""
+# The list of accepted server names. Empty list to accept any. Elements follows format of 'serverName.default'.
+serverName.allowed[] string
diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java
index 1f65bc4f582..165659389ec 100644
--- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java
+++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java
@@ -25,6 +25,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -38,7 +39,7 @@ public class HttpResponseStatisticsCollectorTest {
private Connector connector;
private List<String> monitoringPaths = List.of("/status.html");
private List<String> searchPaths = List.of("/search");
- private HttpResponseStatisticsCollector collector = new HttpResponseStatisticsCollector(monitoringPaths, searchPaths);
+ private HttpResponseStatisticsCollector collector = new HttpResponseStatisticsCollector(monitoringPaths, searchPaths, Set.of());
private int httpResponseCode = 500;
@Test
diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
index 2c5d36bd776..318067ac634 100644
--- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
+++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java
@@ -743,7 +743,7 @@ public class HttpServerTest {
}
@Test
- void requestThatFallbackServerNameCanBeOverridden() throws Exception {
+ void fallbackServerNameCanBeOverridden() throws Exception {
String fallbackHostname = "myhostname";
JettyTestDriver driver = JettyTestDriver.newConfiguredInstance(
new UriRequestHandler(),
@@ -752,13 +752,29 @@ public class HttpServerTest {
.serverName(new ConnectorConfig.ServerName.Builder().fallback(fallbackHostname)));
int listenPort = driver.server().getListenPort();
HttpGet req = new HttpGet("http://localhost:" + listenPort + "/");
- req.addHeader("Host", null);
+ req.setHeader("Host", null);
driver.client().execute(req)
.expectStatusCode(is(OK))
.expectContent(containsString("http://" + fallbackHostname + ":" + listenPort + "/"));
assertTrue(driver.close());
}
+ @Test
+ void acceptedServerNamesCanBeRestricted() throws Exception {
+ String requiredServerName = "myhostname";
+ JettyTestDriver driver = JettyTestDriver.newConfiguredInstance(
+ new EchoRequestHandler(),
+ new ServerConfig.Builder(),
+ new ConnectorConfig.Builder()
+ .serverName(new ConnectorConfig.ServerName.Builder().allowed(requiredServerName)));
+ int listenPort = driver.server().getListenPort();
+ HttpGet req = new HttpGet("http://localhost:" + listenPort + "/");
+ req.setHeader("Host", requiredServerName);
+ driver.client().execute(req).expectStatusCode(is(OK));
+ driver.client().get("/").expectStatusCode(is(NOT_FOUND));
+ assertTrue(driver.close());
+ }
+
private static JettyTestDriver createSslWithTlsClientAuthenticationEnforcer(Path certificateFile, Path privateKeyFile) {
ConnectorConfig.Builder connectorConfig = new ConnectorConfig.Builder()
.tlsClientAuthEnforcer(
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java
index 0d4b2c658cf..36036d6d36d 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -767,8 +768,16 @@ public class JobController {
.filter(versions::contains) // Don't deploy versions that are no longer known.
.ifPresent(versions::add);
- if (versions.isEmpty())
- throw new IllegalStateException("no deployable platform version found in the system");
+ // Remove all versions that are older than the compile version.
+ versions.removeIf(version -> applicationPackage.compileVersion().map(version::isBefore).orElse(false));
+ if (versions.isEmpty()) {
+ // Fall back to the newest deployable version, if all the ones with normal confidence were too old.
+ Iterator<VespaVersion> descending = reversed(versionStatus.deployableVersions()).iterator();
+ if ( ! descending.hasNext())
+ throw new IllegalStateException("no deployable platform version found in the system");
+ else
+ versions.add(descending.next().versionNumber());
+ }
VersionCompatibility compatibility = controller.applications().versionCompatibility(id.applicationId());
List<Version> compatibleVersions = new ArrayList<>();
diff --git a/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java b/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java
index eb171983cf1..a395973a55a 100644
--- a/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java
+++ b/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java
@@ -3,14 +3,49 @@ package com.yahoo.document.json;
import com.yahoo.document.DocumentOperation;
+import java.util.Objects;
+
/**
* The result of JSON parsing a single document operation
- *
- * @param operation
- * the parsed operation
- * @param fullyApplied
- * true if all the JSON content could be applied,
- * false if some (or all) of the fields were not poresent in this document and was ignored
*/
-public record ParsedDocumentOperation(DocumentOperation operation, boolean fullyApplied) {
+public final class ParsedDocumentOperation {
+
+ private final DocumentOperation operation;
+ private final boolean fullyApplied;
+
+ /**
+ * @param operation the parsed operation
+ * @param fullyApplied true if all the JSON content could be applied,
+ * false if some (or all) of the fields were not poresent in this document and was ignored
+ */
+ public ParsedDocumentOperation(DocumentOperation operation, boolean fullyApplied) {
+ this.operation = operation;
+ this.fullyApplied = fullyApplied;
+ }
+
+ public DocumentOperation operation() { return operation; }
+
+ public boolean fullyApplied() { return fullyApplied; }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) return true;
+ if (obj == null || obj.getClass() != this.getClass()) return false;
+ var that = (ParsedDocumentOperation) obj;
+ return Objects.equals(this.operation, that.operation) &&
+ this.fullyApplied == that.fullyApplied;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(operation, fullyApplied);
+ }
+
+ @Override
+ public String toString() {
+ return "ParsedDocumentOperation[" +
+ "operation=" + operation + ", " +
+ "fullyApplied=" + fullyApplied + ']';
+ }
+
}
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index 5766b059d98..feac5d668e7 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -79,7 +79,7 @@ public class Flags {
public static final UnboundBooleanFlag KEEP_STORAGE_NODE_UP = defineFeatureFlag(
"keep-storage-node-up", true,
- List.of("hakonhall"), "2022-07-07", "2022-10-07",
+ List.of("hakonhall"), "2022-07-07", "2022-11-07",
"Whether to leave the storage node (with wanted state) UP while the node is permanently down.",
"Takes effect immediately for nodes transitioning to permanently down.",
ZONE_ID, APPLICATION_ID);
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt
index a7d831aa623..62aca6d68cc 100644
--- a/searchlib/CMakeLists.txt
+++ b/searchlib/CMakeLists.txt
@@ -225,6 +225,7 @@ vespa_define_module(
src/tests/tensor/hnsw_saver
src/tests/tensor/tensor_buffer_operations
src/tests/tensor/tensor_buffer_store
+ src/tests/tensor/tensor_buffer_type_mapper
src/tests/transactionlog
src/tests/transactionlogstress
src/tests/true
diff --git a/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp b/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp
index 1574e7d38f1..8b21952b2d1 100644
--- a/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp
+++ b/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp
@@ -62,7 +62,7 @@ public:
}
void expect_tensor(const Value* exp, EntryRef ref) {
- const auto* act = store.get_tensor(ref);
+ const auto* act = store.get_tensor_ptr(ref);
ASSERT_TRUE(act);
EXPECT_EQ(exp, act);
}
@@ -81,7 +81,7 @@ TEST_F(DirectTensorStoreTest, heap_allocated_memory_is_tracked)
store.store_tensor(make_tensor(5));
auto mem_1 = store.getMemoryUsage();
auto ref = store.store_tensor(make_tensor(10));
- auto tensor_mem_usage = store.get_tensor(ref)->get_memory_usage();
+ auto tensor_mem_usage = store.get_tensor_ptr(ref)->get_memory_usage();
auto mem_2 = store.getMemoryUsage();
EXPECT_GT(tensor_mem_usage.usedBytes(), 500);
EXPECT_LT(tensor_mem_usage.usedBytes(), 50000);
@@ -93,14 +93,14 @@ TEST_F(DirectTensorStoreTest, heap_allocated_memory_is_tracked)
TEST_F(DirectTensorStoreTest, invalid_ref_returns_nullptr)
{
- const auto* t = store.get_tensor(EntryRef());
+ const auto* t = store.get_tensor_ptr(EntryRef());
EXPECT_FALSE(t);
}
TEST_F(DirectTensorStoreTest, hold_adds_entry_to_hold_list)
{
auto ref = store.store_tensor(make_tensor(5));
- auto tensor_mem_usage = store.get_tensor(ref)->get_memory_usage();
+ auto tensor_mem_usage = store.get_tensor_ptr(ref)->get_memory_usage();
auto mem_1 = store.getMemoryUsage();
store.holdTensor(ref);
auto mem_2 = store.getMemoryUsage();
diff --git a/searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt
new file mode 100644
index 00000000000..e219b17ebd1
--- /dev/null
+++ b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchlib_tensor_buffer_type_mapper_test_app TEST
+ SOURCES
+ tensor_buffer_type_mapper_test.cpp
+ DEPENDS
+ searchlib
+ GTest::GTest
+)
+vespa_add_test(NAME searchlib_tensor_buffer_type_mapper_test_app COMMAND searchlib_tensor_buffer_type_mapper_test_app)
diff --git a/searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp
new file mode 100644
index 00000000000..8e88c103516
--- /dev/null
+++ b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp
@@ -0,0 +1,121 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/searchlib/tensor/tensor_buffer_type_mapper.h>
+#include <vespa/searchlib/tensor/tensor_buffer_operations.h>
+#include <vespa/eval/eval/value_type.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using search::tensor::TensorBufferOperations;
+using search::tensor::TensorBufferTypeMapper;
+using vespalib::eval::ValueType;
+
+const vespalib::string tensor_type_sparse_spec("tensor(x{})");
+const vespalib::string tensor_type_2d_spec("tensor(x{},y{})");
+const vespalib::string tensor_type_2d_mixed_spec("tensor(x{},y[2])");
+const vespalib::string float_tensor_type_spec("tensor<float>(y{})");
+const vespalib::string tensor_type_dense_spec("tensor(x[2])");
+
+struct TestParam
+{
+ vespalib::string _name;
+ std::vector<size_t> _array_sizes;
+ vespalib::string _tensor_type_spec;
+ TestParam(vespalib::string name, std::vector<size_t> array_sizes, const vespalib::string& tensor_type_spec)
+ : _name(std::move(name)),
+ _array_sizes(std::move(array_sizes)),
+ _tensor_type_spec(tensor_type_spec)
+ {
+ }
+ TestParam(const TestParam&);
+ ~TestParam();
+};
+
+TestParam::TestParam(const TestParam&) = default;
+
+TestParam::~TestParam() = default;
+
+std::ostream& operator<<(std::ostream& os, const TestParam& param)
+{
+ os << param._name;
+ return os;
+}
+
+class TensorBufferTypeMapperTest : public testing::TestWithParam<TestParam>
+{
+protected:
+ ValueType _tensor_type;
+ TensorBufferOperations _ops;
+ TensorBufferTypeMapper _mapper;
+ TensorBufferTypeMapperTest();
+ ~TensorBufferTypeMapperTest() override;
+ std::vector<size_t> get_array_sizes();
+ void select_type_ids();
+};
+
+TensorBufferTypeMapperTest::TensorBufferTypeMapperTest()
+ : testing::TestWithParam<TestParam>(),
+ _tensor_type(ValueType::from_spec(GetParam()._tensor_type_spec)),
+ _ops(_tensor_type),
+ _mapper(GetParam()._array_sizes.size(), &_ops)
+{
+}
+
+TensorBufferTypeMapperTest::~TensorBufferTypeMapperTest() = default;
+
+std::vector<size_t>
+TensorBufferTypeMapperTest::get_array_sizes()
+{
+ uint32_t max_small_subspaces_type_id = GetParam()._array_sizes.size();
+ std::vector<size_t> array_sizes;
+ for (uint32_t type_id = 1; type_id <= max_small_subspaces_type_id; ++type_id) {
+ auto num_subspaces = type_id - 1;
+ array_sizes.emplace_back(_mapper.get_array_size(type_id));
+ EXPECT_EQ(_ops.get_array_size(num_subspaces), array_sizes.back());
+ }
+ return array_sizes;
+}
+
+void
+TensorBufferTypeMapperTest::select_type_ids()
+{
+ auto& array_sizes = GetParam()._array_sizes;
+ uint32_t type_id = 0;
+ for (auto array_size : array_sizes) {
+ ++type_id;
+ EXPECT_EQ(type_id, _mapper.get_type_id(array_size));
+ EXPECT_EQ(type_id, _mapper.get_type_id(array_size - 1));
+ if (array_size == array_sizes.back()) {
+ // Fallback to indirect storage, using type id 0
+ EXPECT_EQ(0u, _mapper.get_type_id(array_size + 1));
+ } else {
+ EXPECT_EQ(type_id + 1, _mapper.get_type_id(array_size + 1));
+ }
+ }
+}
+
+/*
+ * For "dense" case, array size for type id 1 is irrelevant, since
+ * type ids 0 and 1 are not used when storing dense tensors in
+ * TensorBufferStore.
+ */
+
+VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(TensorBufferTypeMapperMultiTest,
+ TensorBufferTypeMapperTest,
+ testing::Values(TestParam("1d", {8, 16, 32, 40, 64}, tensor_type_sparse_spec),
+ TestParam("1dfloat", {4, 12, 20, 28, 36}, float_tensor_type_spec),
+ TestParam("2d", {8, 24, 40, 56, 80}, tensor_type_2d_spec),
+ TestParam("2dmixed", {8, 24, 48, 64, 96}, tensor_type_2d_mixed_spec),
+ TestParam("dense", {8, 24}, tensor_type_dense_spec)),
+ testing::PrintToStringParamName());
+
+TEST_P(TensorBufferTypeMapperTest, array_sizes_are_calculated)
+{
+ EXPECT_EQ(GetParam()._array_sizes, get_array_sizes());
+}
+
+TEST_P(TensorBufferTypeMapperTest, type_ids_are_selected)
+{
+ select_type_ids();
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp b/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp
index eb5f0ca843b..72f4a7ae579 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp
@@ -68,8 +68,6 @@ FeatureStore::moveFeatures(EntryRef ref, uint64_t bitLen)
const uint8_t *src = getBits(ref);
uint64_t byteLen = (bitLen + 7) / 8;
EntryRef newRef = addFeatures(src, byteLen);
- // Mark old features as dead
- _store.incDead(ref, byteLen + Aligner::pad(byteLen));
return newRef;
}
@@ -117,7 +115,6 @@ FeatureStore::add_features_guard_bytes()
uint32_t pad = Aligner::pad(len);
auto result = _store.rawAllocator<uint8_t>(_typeId).alloc(len + pad);
memset(result.data, 0, len + pad);
- _store.incDead(result.ref, len + pad);
}
void
diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
index 46bfc0909aa..75f453ddcbc 100644
--- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
@@ -30,7 +30,6 @@ vespa_add_library(searchlib_tensor OBJECT
serialized_fast_value_attribute.cpp
small_subspaces_buffer_type.cpp
streamed_value_saver.cpp
- streamed_value_store.cpp
tensor_attribute.cpp
tensor_buffer_operations.cpp
tensor_buffer_store.cpp
diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp
index e2271e63425..7730f340e01 100644
--- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp
@@ -84,7 +84,7 @@ DirectTensorAttribute::update_tensor(DocId docId,
ref = _refVector[docId].load_relaxed();
}
if (ref.valid()) {
- auto ptr = _direct_store.get_tensor(ref);
+ auto ptr = _direct_store.get_tensor_ptr(ref);
if (ptr) {
auto new_value = update.apply_to(*ptr, FastValueBuilderFactory::get());
if (new_value) {
@@ -109,7 +109,7 @@ DirectTensorAttribute::getTensor(DocId docId) const
ref = acquire_entry_ref(docId);
}
if (ref.valid()) {
- auto ptr = _direct_store.get_tensor(ref);
+ auto ptr = _direct_store.get_tensor_ptr(ref);
if (ptr) {
return FastValueBuilderFactory::get().copy(*ptr);
}
@@ -123,7 +123,7 @@ DirectTensorAttribute::get_tensor_ref(DocId docId) const
{
if (docId >= getCommittedDocIdLimit()) { return *_emptyTensor; }
- auto ptr = _direct_store.get_tensor(acquire_entry_ref(docId));
+ auto ptr = _direct_store.get_tensor_ptr(acquire_entry_ref(docId));
if ( ptr == nullptr) { return *_emptyTensor; }
return *ptr;
diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp b/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp
index 024b2fe5467..0de4491cfcc 100644
--- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp
@@ -35,7 +35,7 @@ DirectTensorAttributeSaver::onSave(IAttributeSaveTarget &saveTarget)
const uint32_t docIdLimit(_refs.size());
vespalib::nbostream stream;
for (uint32_t lid = 0; lid < docIdLimit; ++lid) {
- const vespalib::eval::Value *tensor = _tensorStore.get_tensor(_refs[lid]);
+ const vespalib::eval::Value *tensor = _tensorStore.get_tensor_ptr(_refs[lid]);
if (tensor) {
stream.clear();
encode_value(*tensor, stream);
diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h b/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h
index 57e7453ff99..658d1ab0549 100644
--- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h
+++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h
@@ -40,7 +40,7 @@ public:
~DirectTensorStore() override;
using RefType = TensorStoreType::RefType;
- const vespalib::eval::Value * get_tensor(EntryRef ref) const {
+ const vespalib::eval::Value * get_tensor_ptr(EntryRef ref) const {
if (!ref.valid()) {
return nullptr;
}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
deleted file mode 100644
index e8752a3145a..00000000000
--- a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
+++ /dev/null
@@ -1,288 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "streamed_value_store.h"
-#include <vespa/eval/eval/value.h>
-#include <vespa/eval/eval/value_codec.h>
-#include <vespa/eval/eval/fast_value.hpp>
-#include <vespa/eval/streamed/streamed_value_builder_factory.h>
-#include <vespa/eval/streamed/streamed_value_view.h>
-#include <vespa/vespalib/datastore/buffer_type.hpp>
-#include <vespa/vespalib/datastore/compacting_buffers.h>
-#include <vespa/vespalib/datastore/compaction_context.h>
-#include <vespa/vespalib/datastore/compaction_strategy.h>
-#include <vespa/vespalib/datastore/datastore.hpp>
-#include <vespa/vespalib/objects/nbostream.h>
-#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/vespalib/util/typify.h>
-#include <vespa/log/log.h>
-
-LOG_SETUP(".searchlib.tensor.streamed_value_store");
-
-using vespalib::datastore::CompactionContext;
-using vespalib::datastore::CompactionSpec;
-using vespalib::datastore::CompactionStrategy;
-using vespalib::datastore::EntryRef;
-using vespalib::datastore::Handle;
-using vespalib::datastore::ICompactionContext;
-using namespace vespalib::eval;
-using vespalib::ConstArrayRef;
-using vespalib::MemoryUsage;
-using vespalib::string_id;
-using vespalib::StringIdVector;
-
-namespace search::tensor {
-
-//-----------------------------------------------------------------------------
-
-namespace {
-
-template <typename CT, typename F>
-void each_subspace(const Value &value, size_t num_mapped, size_t dense_size, F f) {
- size_t subspace;
- std::vector<string_id> addr(num_mapped);
- std::vector<string_id*> refs;
- refs.reserve(addr.size());
- for (string_id &label: addr) {
- refs.push_back(&label);
- }
- auto cells = value.cells().typify<CT>();
- auto view = value.index().create_view({});
- view->lookup({});
- while (view->next_result(refs, subspace)) {
- size_t offset = subspace * dense_size;
- f(ConstArrayRef<string_id>(addr), ConstArrayRef<CT>(cells.begin() + offset, dense_size));
- }
-}
-
-using TensorEntry = StreamedValueStore::TensorEntry;
-
-struct CreateTensorEntry {
- template <typename CT>
- static TensorEntry::SP invoke(const Value &value, size_t num_mapped, size_t dense_size) {
- using EntryImpl = StreamedValueStore::TensorEntryImpl<CT>;
- return std::make_shared<EntryImpl>(value, num_mapped, dense_size);
- }
-};
-
-struct MyFastValueView final : Value {
- const ValueType &my_type;
- FastValueIndex my_index;
- TypedCells my_cells;
- MyFastValueView(const ValueType &type_ref, const StringIdVector &handle_view, TypedCells cells, size_t num_mapped, size_t num_spaces)
- : my_type(type_ref),
- my_index(num_mapped, handle_view, num_spaces),
- my_cells(cells)
- {
- const StringIdVector &labels = handle_view;
- for (size_t i = 0; i < num_spaces; ++i) {
- ConstArrayRef<string_id> addr(labels.data() + (i * num_mapped), num_mapped);
- my_index.map.add_mapping(FastAddrMap::hash_labels(addr));
- }
- assert(my_index.map.size() == num_spaces);
- }
- const ValueType &type() const override { return my_type; }
- const Value::Index &index() const override { return my_index; }
- TypedCells cells() const override { return my_cells; }
- MemoryUsage get_memory_usage() const override {
- MemoryUsage usage = self_memory_usage<MyFastValueView>();
- usage.merge(my_index.map.estimate_extra_memory_usage());
- return usage;
- }
-};
-
-} // <unnamed>
-
-//-----------------------------------------------------------------------------
-
-StreamedValueStore::TensorEntry::~TensorEntry() = default;
-
-StreamedValueStore::TensorEntry::SP
-StreamedValueStore::TensorEntry::create_shared_entry(const Value &value)
-{
- size_t num_mapped = value.type().count_mapped_dimensions();
- size_t dense_size = value.type().dense_subspace_size();
- return vespalib::typify_invoke<1,TypifyCellType,CreateTensorEntry>(value.type().cell_type(), value, num_mapped, dense_size);
-}
-
-template <typename CT>
-StreamedValueStore::TensorEntryImpl<CT>::TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size)
- : handles(),
- cells()
-{
- handles.reserve(num_mapped * value.index().size());
- cells.reserve(dense_size * value.index().size());
- auto store_subspace = [&](auto addr, auto data) {
- for (string_id label: addr) {
- handles.push_back(label);
- }
- for (CT entry: data) {
- cells.push_back(entry);
- }
- };
- each_subspace<CT>(value, num_mapped, dense_size, store_subspace);
-}
-
-template <typename CT>
-Value::UP
-StreamedValueStore::TensorEntryImpl<CT>::create_fast_value_view(const ValueType &type_ref) const
-{
- size_t num_mapped = type_ref.count_mapped_dimensions();
- size_t dense_size = type_ref.dense_subspace_size();
- size_t num_spaces = cells.size() / dense_size;
- assert(dense_size * num_spaces == cells.size());
- assert(num_mapped * num_spaces == handles.view().size());
- return std::make_unique<MyFastValueView>(type_ref, handles.view(), TypedCells(cells), num_mapped, num_spaces);
-}
-
-template <typename CT>
-void
-StreamedValueStore::TensorEntryImpl<CT>::encode_value(const ValueType &type, vespalib::nbostream &target) const
-{
- size_t num_mapped = type.count_mapped_dimensions();
- size_t dense_size = type.dense_subspace_size();
- size_t num_spaces = cells.size() / dense_size;
- assert(dense_size * num_spaces == cells.size());
- assert(num_mapped * num_spaces == handles.view().size());
- StreamedValueView my_value(type, num_mapped, TypedCells(cells), num_spaces, handles.view());
- ::vespalib::eval::encode_value(my_value, target);
-}
-
-template <typename CT>
-MemoryUsage
-StreamedValueStore::TensorEntryImpl<CT>::get_memory_usage() const
-{
- MemoryUsage usage = self_memory_usage<TensorEntryImpl<CT>>();
- usage.merge(vector_extra_memory_usage(handles.view()));
- usage.merge(vector_extra_memory_usage(cells));
- return usage;
-}
-
-template <typename CT>
-StreamedValueStore::TensorEntryImpl<CT>::~TensorEntryImpl() = default;
-
-//-----------------------------------------------------------------------------
-
-constexpr size_t MIN_BUFFER_ARRAYS = 8_Ki;
-
-StreamedValueStore::TensorBufferType::TensorBufferType() noexcept
- : ParentType(1, MIN_BUFFER_ARRAYS, TensorStoreType::RefType::offsetSize())
-{
-}
-
-void
-StreamedValueStore::TensorBufferType::cleanHold(void* buffer, size_t offset, ElemCount num_elems, CleanContext clean_ctx)
-{
- TensorEntry::SP* elem = static_cast<TensorEntry::SP*>(buffer) + offset;
- const auto& empty = empty_entry();
- for (size_t i = 0; i < num_elems; ++i) {
- clean_ctx.extraBytesCleaned((*elem)->get_memory_usage().allocatedBytes());
- *elem = empty;
- ++elem;
- }
-}
-
-StreamedValueStore::StreamedValueStore(const ValueType &tensor_type)
- : TensorStore(_concrete_store),
- _concrete_store(std::make_unique<TensorBufferType>()),
- _tensor_type(tensor_type)
-{
- _concrete_store.enableFreeLists();
-}
-
-StreamedValueStore::~StreamedValueStore() = default;
-
-EntryRef
-StreamedValueStore::add_entry(TensorEntry::SP tensor)
-{
- auto ref = _concrete_store.addEntry(tensor);
- auto& state = _concrete_store.getBufferState(RefType(ref).bufferId());
- state.stats().inc_extra_used_bytes(tensor->get_memory_usage().allocatedBytes());
- return ref;
-}
-
-const StreamedValueStore::TensorEntry *
-StreamedValueStore::get_tensor_entry(EntryRef ref) const
-{
- if (!ref.valid()) {
- return nullptr;
- }
- const auto& entry = _concrete_store.getEntry(ref);
- assert(entry);
- return entry.get();
-}
-
-std::unique_ptr<vespalib::eval::Value>
-StreamedValueStore::get_tensor(EntryRef ref) const
-{
- if (const auto * ptr = get_tensor_entry(ref)) {
- return ptr->create_fast_value_view(_tensor_type);
- }
- return {};
-}
-
-void
-StreamedValueStore::holdTensor(EntryRef ref)
-{
- if (!ref.valid()) {
- return;
- }
- const auto& tensor = _concrete_store.getEntry(ref);
- assert(tensor);
- _concrete_store.holdElem(ref, 1, tensor->get_memory_usage().allocatedBytes());
-}
-
-TensorStore::EntryRef
-StreamedValueStore::move(EntryRef ref)
-{
- if (!ref.valid()) {
- return EntryRef();
- }
- const auto& old_tensor = _concrete_store.getEntry(ref);
- assert(old_tensor);
- auto new_ref = add_entry(old_tensor);
- _concrete_store.holdElem(ref, 1, old_tensor->get_memory_usage().allocatedBytes());
- return new_ref;
-}
-
-vespalib::MemoryUsage
-StreamedValueStore::update_stat(const CompactionStrategy& compaction_strategy)
-{
- auto memory_usage = _store.getMemoryUsage();
- _compaction_spec = CompactionSpec(compaction_strategy.should_compact_memory(memory_usage), false);
- return memory_usage;
-}
-
-std::unique_ptr<ICompactionContext>
-StreamedValueStore::start_compact(const CompactionStrategy& compaction_strategy)
-{
- auto compacting_buffers = _store.start_compact_worst_buffers(_compaction_spec, compaction_strategy);
- return std::make_unique<CompactionContext>(*this, std::move(compacting_buffers));
-}
-
-bool
-StreamedValueStore::encode_stored_tensor(EntryRef ref, vespalib::nbostream &target) const
-{
- if (const auto * entry = get_tensor_entry(ref)) {
- entry->encode_value(_tensor_type, target);
- return true;
- } else {
- return false;
- }
-}
-
-TensorStore::EntryRef
-StreamedValueStore::store_tensor(const Value &tensor)
-{
- assert(tensor.type() == _tensor_type);
- return add_entry(TensorEntry::create_shared_entry(tensor));
-}
-
-TensorStore::EntryRef
-StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded)
-{
- const auto &factory = StreamedValueBuilderFactory::get();
- auto val = vespalib::eval::decode_value(encoded, factory);
- return store_tensor(*val);
-}
-
-}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
deleted file mode 100644
index 58137e316dd..00000000000
--- a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "tensor_store.h"
-#include <vespa/eval/eval/value_type.h>
-#include <vespa/eval/eval/value.h>
-#include <vespa/eval/streamed/streamed_value.h>
-#include <vespa/vespalib/datastore/datastore.h>
-#include <vespa/vespalib/objects/nbostream.h>
-#include <vespa/vespalib/util/shared_string_repo.h>
-
-namespace search::tensor {
-
-/**
- * Class for StreamedValue tensors in memory.
- */
-class StreamedValueStore : public TensorStore {
-public:
- using Value = vespalib::eval::Value;
- using ValueType = vespalib::eval::ValueType;
- using Handles = vespalib::SharedStringRepo::Handles;
- using MemoryUsage = vespalib::MemoryUsage;
-
- // interface for tensor entries
- struct TensorEntry {
- using SP = std::shared_ptr<TensorEntry>;
- virtual Value::UP create_fast_value_view(const ValueType &type_ref) const = 0;
- virtual void encode_value(const ValueType &type, vespalib::nbostream &target) const = 0;
- virtual MemoryUsage get_memory_usage() const = 0;
- virtual ~TensorEntry();
- static TensorEntry::SP create_shared_entry(const Value &value);
- };
-
- // implementation of tensor entries
- template <typename CT>
- struct TensorEntryImpl : public TensorEntry {
- Handles handles;
- std::vector<CT> cells;
- TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size);
- Value::UP create_fast_value_view(const ValueType &type_ref) const override;
- void encode_value(const ValueType &type, vespalib::nbostream &target) const override;
- MemoryUsage get_memory_usage() const override;
- ~TensorEntryImpl() override;
- };
-
-private:
- // Note: Must use SP (instead of UP) because of fallbackCopy() and initializeReservedElements() in BufferType,
- // and implementation of move().
- using TensorStoreType = vespalib::datastore::DataStore<TensorEntry::SP>;
-
- class TensorBufferType : public vespalib::datastore::BufferType<TensorEntry::SP> {
- private:
- using ParentType = BufferType<TensorEntry::SP>;
- using ParentType::empty_entry;
- using CleanContext = typename ParentType::CleanContext;
- public:
- TensorBufferType() noexcept;
- void cleanHold(void* buffer, size_t offset, ElemCount num_elems, CleanContext clean_ctx) override;
- };
- TensorStoreType _concrete_store;
- const vespalib::eval::ValueType _tensor_type;
- EntryRef add_entry(TensorEntry::SP tensor);
- const TensorEntry* get_tensor_entry(EntryRef ref) const;
-public:
- StreamedValueStore(const vespalib::eval::ValueType &tensor_type);
- ~StreamedValueStore() override;
-
- using RefType = TensorStoreType::RefType;
-
- void holdTensor(EntryRef ref) override;
- EntryRef move(EntryRef ref) override;
- vespalib::MemoryUsage update_stat(const vespalib::datastore::CompactionStrategy& compaction_strategy) override;
- std::unique_ptr<vespalib::datastore::ICompactionContext> start_compact(const vespalib::datastore::CompactionStrategy& compaction_strategy) override;
-
- std::unique_ptr<vespalib::eval::Value> get_tensor(EntryRef ref) const;
- bool encode_stored_tensor(EntryRef ref, vespalib::nbostream &target) const;
-
- EntryRef store_tensor(const vespalib::eval::Value &tensor);
- EntryRef store_encoded_tensor(vespalib::nbostream &encoded);
-};
-
-
-}
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index 7aafb7c364e..b84a90465ea 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -54,6 +54,7 @@ vespa_define_module(
src/tests/data/smart_buffer
src/tests/datastore/array_store
src/tests/datastore/array_store_config
+ src/tests/datastore/buffer_stats
src/tests/datastore/buffer_type
src/tests/datastore/compact_buffer_candidates
src/tests/datastore/datastore
diff --git a/vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt b/vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt
new file mode 100644
index 00000000000..2463f584133
--- /dev/null
+++ b/vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_datastore_buffer_stats_test_app TEST
+ SOURCES
+ buffer_stats_test.cpp
+ DEPENDS
+ vespalib
+ GTest::GTest
+)
+vespa_add_test(NAME vespalib_datastore_buffer_stats_test_app COMMAND vespalib_datastore_buffer_stats_test_app)
diff --git a/vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp b/vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp
new file mode 100644
index 00000000000..09b2590a5f3
--- /dev/null
+++ b/vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/datastore/buffer_stats.h>
+#include <vespa/vespalib/datastore/memory_stats.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace vespalib::datastore;
+
+TEST(BufferStatsTest, buffer_stats_to_memory_stats)
+{
+ InternalBufferStats buf;
+ buf.set_alloc_elems(17);
+ buf.pushed_back(7);
+ buf.set_dead_elems(5);
+ buf.set_hold_elems(3);
+ buf.inc_extra_used_bytes(13);
+ buf.inc_extra_hold_bytes(11);
+
+ MemoryStats mem;
+ constexpr size_t es = 8;
+ buf.add_to_mem_stats(es, mem);
+
+ EXPECT_EQ(17, mem._allocElems);
+ EXPECT_EQ(7, mem._usedElems);
+ EXPECT_EQ(5, mem._deadElems);
+ EXPECT_EQ(3, mem._holdElems);
+ EXPECT_EQ(17 * es + 13, mem._allocBytes);
+ EXPECT_EQ(7 * es + 13, mem._usedBytes);
+ EXPECT_EQ(5 * es, mem._deadBytes);
+ EXPECT_EQ(3 * es + 11, mem._holdBytes);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
+
diff --git a/vespalib/src/tests/datastore/datastore/datastore_test.cpp b/vespalib/src/tests/datastore/datastore/datastore_test.cpp
index bf389e5e78e..2c4e68467da 100644
--- a/vespalib/src/tests/datastore/datastore/datastore_test.cpp
+++ b/vespalib/src/tests/datastore/datastore/datastore_test.cpp
@@ -17,7 +17,6 @@ using vespalib::alloc::MemoryAllocator;
class MyStore : public DataStore<int, EntryRefT<3, 2> > {
private:
using ParentType = DataStore<int, EntryRefT<3, 2> >;
- using ParentType::_primary_buffer_ids;
public:
MyStore() {}
explicit MyStore(std::unique_ptr<BufferType<int>> type)
@@ -35,9 +34,6 @@ public:
void trimElemHoldList(generation_t usedGen) override {
ParentType::trimElemHoldList(usedGen);
}
- void incDead(EntryRef ref, uint64_t dead) {
- ParentType::incDead(ref, dead);
- }
void ensureBufferCapacity(size_t sizeNeeded) {
ParentType::ensureBufferCapacity(0, sizeNeeded);
}
@@ -47,7 +43,7 @@ public:
void switch_primary_buffer() {
ParentType::switch_primary_buffer(0, 0u);
}
- size_t primary_buffer_id() const { return _primary_buffer_ids[0]; }
+ size_t primary_buffer_id() const { return get_primary_buffer_id(0); }
BufferState& get_active_buffer_state() {
return ParentType::getBufferState(primary_buffer_id());
}
@@ -429,11 +425,6 @@ TEST(DataStoreTest, require_that_memory_stats_are_calculated)
m._usedElems++;
assertMemStats(m, s.getMemStats());
- // inc dead
- s.incDead(r, 1);
- m._deadElems++;
- assertMemStats(m, s.getMemStats());
-
// hold buffer
s.addEntry(20);
s.addEntry(30);
@@ -474,7 +465,7 @@ TEST(DataStoreTest, require_that_memory_stats_are_calculated)
{ // increase extra hold bytes
auto prev_stats = s.getMemStats();
- s.get_active_buffer_state().stats().inc_extra_hold_bytes(30);
+ s.get_active_buffer_state().hold_elems(0, 30);
auto curr_stats = s.getMemStats();
EXPECT_EQ(prev_stats._holdBytes + 30, curr_stats._holdBytes);
}
@@ -487,7 +478,6 @@ TEST(DataStoreTest, require_that_memory_usage_is_calculated)
s.addEntry(20);
s.addEntry(30);
s.addEntry(40);
- s.incDead(r, 1);
s.holdBuffer(r.bufferId());
s.transferHoldLists(100);
vespalib::MemoryUsage m = s.getMemoryUsage();
@@ -698,9 +688,9 @@ void test_free_element_to_held_buffer(bool direct, bool before_hold_buffer)
s.holdBuffer(0); // hold last buffer
if (!before_hold_buffer) {
if (direct) {
- ASSERT_DEATH({ s.freeElem(ref, 1); }, "state.isOnHold\\(\\) && was_held");
+ ASSERT_DEATH({ s.freeElem(ref, 1); }, "isOnHold\\(\\) && was_held");
} else {
- ASSERT_DEATH({ s.holdElem(ref, 1); }, "state.isActive\\(\\)");
+ ASSERT_DEATH({ s.holdElem(ref, 1); }, "isActive\\(\\)");
}
}
s.transferHoldLists(100);
diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp b/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp
index 0d367cf9835..8d97414626e 100644
--- a/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp
@@ -16,14 +16,6 @@ BufferStats::BufferStats()
}
void
-BufferStats::dec_hold_elems(size_t value)
-{
- ElemCount elems = hold_elems();
- assert(elems >= value);
- _hold_elems.store(elems - value, std::memory_order_relaxed);
-}
-
-void
BufferStats::add_to_mem_stats(size_t element_size, MemoryStats& stats) const
{
size_t extra_used = extra_used_bytes();
@@ -37,13 +29,13 @@ BufferStats::add_to_mem_stats(size_t element_size, MemoryStats& stats) const
stats._holdBytes += (hold_elems() * element_size) + extra_hold_bytes();
}
-MutableBufferStats::MutableBufferStats()
+InternalBufferStats::InternalBufferStats()
: BufferStats()
{
}
void
-MutableBufferStats::clear()
+InternalBufferStats::clear()
{
_alloc_elems.store(0, std::memory_order_relaxed);
_used_elems.store(0, std::memory_order_relaxed);
@@ -53,5 +45,13 @@ MutableBufferStats::clear()
_extra_hold_bytes.store(0, std::memory_order_relaxed);
}
+void
+InternalBufferStats::dec_hold_elems(size_t value)
+{
+ ElemCount elems = hold_elems();
+ assert(elems >= value);
+ _hold_elems.store(elems - value, std::memory_order_relaxed);
+}
+
}
diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_stats.h b/vespalib/src/vespa/vespalib/datastore/buffer_stats.h
index 0df74c0a79d..66f8b532c41 100644
--- a/vespalib/src/vespa/vespalib/datastore/buffer_stats.h
+++ b/vespalib/src/vespa/vespalib/datastore/buffer_stats.h
@@ -47,11 +47,7 @@ public:
size_t extra_used_bytes() const { return _extra_used_bytes.load(std::memory_order_relaxed); }
size_t extra_hold_bytes() const { return _extra_hold_bytes.load(std::memory_order_relaxed); }
- void inc_dead_elems(size_t value) { _dead_elems.store(dead_elems() + value, std::memory_order_relaxed); }
- void inc_hold_elems(size_t value) { _hold_elems.store(hold_elems() + value, std::memory_order_relaxed); }
- void dec_hold_elems(size_t value);
void inc_extra_used_bytes(size_t value) { _extra_used_bytes.store(extra_used_bytes() + value, std::memory_order_relaxed); }
- void inc_extra_hold_bytes(size_t value) { _extra_hold_bytes.store(extra_hold_bytes() + value, std::memory_order_relaxed); }
void add_to_mem_stats(size_t element_size, MemoryStats& stats) const;
};
@@ -59,13 +55,17 @@ public:
/**
* Provides low-level access to buffer stats for integration in BufferState.
*/
-class MutableBufferStats : public BufferStats {
+class InternalBufferStats : public BufferStats {
public:
- MutableBufferStats();
+ InternalBufferStats();
void clear();
void set_alloc_elems(size_t value) { _alloc_elems.store(value, std::memory_order_relaxed); }
void set_dead_elems(size_t value) { _dead_elems.store(value, std::memory_order_relaxed); }
void set_hold_elems(size_t value) { _hold_elems.store(value, std::memory_order_relaxed); }
+ void inc_dead_elems(size_t value) { _dead_elems.store(dead_elems() + value, std::memory_order_relaxed); }
+ void inc_hold_elems(size_t value) { _hold_elems.store(hold_elems() + value, std::memory_order_relaxed); }
+ void dec_hold_elems(size_t value);
+ void inc_extra_hold_bytes(size_t value) { _extra_hold_bytes.store(extra_hold_bytes() + value, std::memory_order_relaxed); }
std::atomic<ElemCount>& used_elems_ref() { return _used_elems; }
std::atomic<ElemCount>& dead_elems_ref() { return _dead_elems; }
std::atomic<size_t>& extra_used_bytes_ref() { return _extra_used_bytes; }
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
index 35455a193d2..e9ff243fb08 100644
--- a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
@@ -176,6 +176,39 @@ BufferState::disableElemHoldList()
_disableElemHoldList = true;
}
+bool
+BufferState::hold_elems(size_t num_elems, size_t extra_bytes)
+{
+ assert(isActive());
+ if (_disableElemHoldList) {
+ // The elements are directly marked as dead as they are not put on hold.
+ _stats.inc_dead_elems(num_elems);
+ return true;
+ }
+ _stats.inc_hold_elems(num_elems);
+ _stats.inc_extra_hold_bytes(extra_bytes);
+ return false;
+}
+
+void
+BufferState::free_elems(EntryRef ref, size_t num_elems, bool was_held, size_t ref_offset)
+{
+ if (isActive()) {
+ if (_free_list.enabled() && (num_elems == getArraySize())) {
+ _free_list.push_entry(ref);
+ }
+ } else {
+ assert(isOnHold() && was_held);
+ }
+ _stats.inc_dead_elems(num_elems);
+ if (was_held) {
+ _stats.dec_hold_elems(num_elems);
+ }
+ getTypeHandler()->cleanHold(_buffer.get(), (ref_offset * _arraySize), num_elems,
+ BufferTypeBase::CleanContext(_stats.extra_used_bytes_ref(),
+ _stats.extra_hold_bytes_ref()));
+}
+
void
BufferState::fallbackResize(uint32_t bufferId,
size_t elementsNeeded,
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.h b/vespalib/src/vespa/vespalib/datastore/bufferstate.h
index c35a51b0c99..0acad7d3ab1 100644
--- a/vespalib/src/vespa/vespalib/datastore/bufferstate.h
+++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.h
@@ -39,7 +39,7 @@ public:
};
private:
- MutableBufferStats _stats;
+ InternalBufferStats _stats;
BufferFreeList _free_list;
std::atomic<BufferTypeBase*> _typeHandler;
Alloc _buffer;
@@ -83,24 +83,34 @@ public:
void onFree(std::atomic<void*>& buffer);
/**
- * Disable hold of elements, just mark then as dead without cleanup.
+ * Disable hold of elements, just mark elements as dead without cleanup.
* Typically used when tearing down data structure in a controlled manner.
*/
void disableElemHoldList();
+ /**
+ * Update stats to reflect that the given elements are put on hold.
+ * Returns true if element hold list is disabled for this buffer.
+ */
+ bool hold_elems(size_t num_elems, size_t extra_bytes);
+
+ /**
+ * Free the given elements and update stats accordingly.
+ *
+ * The given entry ref is put on the free list (if enabled).
+ * Hold cleaning of elements is executed on the buffer type.
+ */
+ void free_elems(EntryRef ref, size_t num_elems, bool was_held, size_t ref_offset);
+
BufferStats& stats() { return _stats; }
const BufferStats& stats() const { return _stats; }
- BufferFreeList& free_list() { return _free_list; }
- const BufferFreeList& free_list() const { return _free_list; }
+
+ void enable_free_list(FreeList& type_free_list) { _free_list.enable(type_free_list); }
+ void disable_free_list() { _free_list.disable(); }
size_t size() const { return _stats.size(); }
size_t capacity() const { return _stats.capacity(); }
size_t remaining() const { return _stats.remaining(); }
- void cleanHold(void *buffer, size_t offset, ElemCount numElems) {
- getTypeHandler()->cleanHold(buffer, offset, numElems,
- BufferTypeBase::CleanContext(_stats.extra_used_bytes_ref(),
- _stats.extra_hold_bytes_ref()));
- }
void dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer);
uint32_t getTypeId() const { return _typeId; }
uint32_t getArraySize() const { return _arraySize; }
@@ -119,7 +129,6 @@ public:
const BufferTypeBase *getTypeHandler() const { return _typeHandler.load(std::memory_order_relaxed); }
BufferTypeBase *getTypeHandler() { return _typeHandler.load(std::memory_order_relaxed); }
- bool hasDisabledElemHoldList() const { return _disableElemHoldList; }
void resume_primary_buffer(uint32_t buffer_id);
};
diff --git a/vespalib/src/vespa/vespalib/datastore/datastore.h b/vespalib/src/vespa/vespalib/datastore/datastore.h
index 86c39b547d3..6ff71238a89 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastore.h
+++ b/vespalib/src/vespa/vespalib/datastore/datastore.h
@@ -38,17 +38,6 @@ public:
~DataStoreT() override;
/**
- * Increase number of dead elements in buffer.
- *
- * @param ref Reference to dead stored features
- * @param dead Number of newly dead elements
- */
- void incDead(EntryRef ref, size_t deadElems) {
- RefType intRef(ref);
- DataStoreBase::incDead(intRef.bufferId(), deadElems);
- }
-
- /**
* Free element(s).
*/
void freeElem(EntryRef ref, size_t numElems) { free_elem_internal(ref, numElems, false); }
@@ -97,7 +86,6 @@ class DataStore : public DataStoreT<RefT>
protected:
typedef DataStoreT<RefT> ParentType;
using ParentType::ensureBufferCapacity;
- using ParentType::_primary_buffer_ids;
using ParentType::getEntry;
using ParentType::dropBuffers;
using ParentType::init_primary_buffers;
diff --git a/vespalib/src/vespa/vespalib/datastore/datastore.hpp b/vespalib/src/vespa/vespalib/datastore/datastore.hpp
index 90f7507f80f..18abe29d04f 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastore.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/datastore.hpp
@@ -26,19 +26,7 @@ DataStoreT<RefT>::free_elem_internal(EntryRef ref, size_t numElems, bool was_hel
{
RefType intRef(ref);
BufferState &state = getBufferState(intRef.bufferId());
- if (state.isActive()) {
- if (state.free_list().enabled() && (numElems == state.getArraySize())) {
- state.free_list().push_entry(ref);
- }
- } else {
- assert(state.isOnHold() && was_held);
- }
- state.stats().inc_dead_elems(numElems);
- if (was_held) {
- state.stats().dec_hold_elems(numElems);
- }
- state.cleanHold(getBuffer(intRef.bufferId()),
- intRef.offset() * state.getArraySize(), numElems);
+ state.free_elems(ref, numElems, was_held, intRef.offset());
}
template <typename RefT>
@@ -47,33 +35,27 @@ DataStoreT<RefT>::holdElem(EntryRef ref, size_t numElems, size_t extraBytes)
{
RefType intRef(ref);
BufferState &state = getBufferState(intRef.bufferId());
- assert(state.isActive());
- if (state.hasDisabledElemHoldList()) {
- state.stats().inc_dead_elems(numElems);
- return;
+ if (!state.hold_elems(numElems, extraBytes)) {
+ _elemHold1List.push_back(ElemHold1ListElem(ref, numElems));
}
- _elemHold1List.push_back(ElemHold1ListElem(ref, numElems));
- state.stats().inc_hold_elems(numElems);
- state.stats().inc_extra_hold_bytes(extraBytes);
}
template <typename RefT>
void
DataStoreT<RefT>::trimElemHoldList(generation_t usedGen)
{
- ElemHold2List &elemHold2List = _elemHold2List;
-
- ElemHold2List::iterator it(elemHold2List.begin());
- ElemHold2List::iterator ite(elemHold2List.end());
+ auto it = _elemHold2List.begin();
+ auto ite = _elemHold2List.end();
uint32_t freed = 0;
for (; it != ite; ++it) {
- if (static_cast<sgeneration_t>(it->_generation - usedGen) >= 0)
+ if (static_cast<sgeneration_t>(it->_generation - usedGen) >= 0) {
break;
+ }
free_elem_internal(it->_ref, it->_len, true);
++freed;
}
if (freed != 0) {
- elemHold2List.erase(elemHold2List.begin(), it);
+ _elemHold2List.erase(_elemHold2List.begin(), it);
}
}
@@ -81,14 +63,12 @@ template <typename RefT>
void
DataStoreT<RefT>::clearElemHoldList()
{
- ElemHold2List &elemHold2List = _elemHold2List;
-
- ElemHold2List::iterator it(elemHold2List.begin());
- ElemHold2List::iterator ite(elemHold2List.end());
+ auto it = _elemHold2List.begin();
+ auto ite = _elemHold2List.end();
for (; it != ite; ++it) {
free_elem_internal(it->_ref, it->_len, true);
}
- elemHold2List.clear();
+ _elemHold2List.clear();
}
template <typename RefT>
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
index 302a1b49219..dd6c767e9c6 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
@@ -223,9 +223,8 @@ DataStoreBase::addType(BufferTypeBase *typeHandler)
void
DataStoreBase::transferElemHoldList(generation_t generation)
{
- ElemHold2List &elemHold2List = _elemHold2List;
- for (const ElemHold1ListElem & elemHold1 : _elemHold1List) {
- elemHold2List.push_back(ElemHold2ListElem(elemHold1, generation));
+ for (const auto& elemHold1 : _elemHold1List) {
+ _elemHold2List.push_back(ElemHold2ListElem(elemHold1, generation));
}
_elemHold1List.clear();
}
@@ -289,18 +288,18 @@ DataStoreBase::holdBuffer(uint32_t bufferId)
{
_states[bufferId].onHold(bufferId);
size_t holdBytes = 0u; // getMemStats() still accounts held buffers
- GenerationHeldBase::UP hold(new BufferHold(holdBytes, *this, bufferId));
+ auto hold = std::make_unique<BufferHold>(holdBytes, *this, bufferId);
_genHolder.hold(std::move(hold));
}
void
DataStoreBase::enableFreeLists()
{
- for (BufferState & bState : _states) {
+ for (auto& bState : _states) {
if (!bState.isActive() || bState.getCompacting()) {
continue;
}
- bState.free_list().enable(_free_lists[bState.getTypeId()]);
+ bState.enable_free_list(_free_lists[bState.getTypeId()]);
}
_freeListsEnabled = true;
}
@@ -308,8 +307,8 @@ DataStoreBase::enableFreeLists()
void
DataStoreBase::disableFreeLists()
{
- for (BufferState & bState : _states) {
- bState.free_list().disable();
+ for (auto& bState : _states) {
+ bState.disable_free_list();
}
_freeListsEnabled = false;
}
@@ -321,17 +320,11 @@ DataStoreBase::enableFreeList(uint32_t bufferId)
if (_freeListsEnabled &&
state.isActive() &&
!state.getCompacting()) {
- state.free_list().enable(_free_lists[state.getTypeId()]);
+ state.enable_free_list(_free_lists[state.getTypeId()]);
}
}
void
-DataStoreBase::disableFreeList(uint32_t bufferId)
-{
- _states[bufferId].free_list().disable();
-}
-
-void
DataStoreBase::disableElemHoldList()
{
for (auto &state : _states) {
@@ -346,9 +339,9 @@ DataStoreBase::getMemStats() const
{
MemoryStats stats;
- for (const BufferState & bState: _states) {
+ for (const auto& bState: _states) {
auto typeHandler = bState.getTypeHandler();
- BufferState::State state = bState.getState();
+ auto state = bState.getState();
if ((state == BufferState::State::FREE) || (typeHandler == nullptr)) {
++stats._freeBuffers;
} else if (state == BufferState::State::ACTIVE) {
@@ -376,7 +369,7 @@ DataStoreBase::getAddressSpaceUsage() const
size_t usedArrays = 0;
size_t deadArrays = 0;
size_t limitArrays = 0;
- for (const BufferState & bState: _states) {
+ for (const auto& bState: _states) {
if (bState.isActive()) {
uint32_t arraySize = bState.getArraySize();
usedArrays += bState.size() / arraySize;
@@ -392,7 +385,7 @@ DataStoreBase::getAddressSpaceUsage() const
LOG_ABORT("should not be reached");
}
}
- return vespalib::AddressSpace(usedArrays, deadArrays, limitArrays);
+ return {usedArrays, deadArrays, limitArrays};
}
void
@@ -429,12 +422,11 @@ DataStoreBase::fallbackResize(uint32_t bufferId, size_t elemsNeeded)
state.fallbackResize(bufferId, elemsNeeded,
_buffers[bufferId].get_atomic_buffer(),
toHoldBuffer);
- GenerationHeldBase::UP
- hold(new FallbackHold(oldAllocElems * elementSize,
- std::move(toHoldBuffer),
- oldUsedElems,
- state.getTypeHandler(),
- state.getTypeId()));
+ auto hold = std::make_unique<FallbackHold>(oldAllocElems * elementSize,
+ std::move(toHoldBuffer),
+ oldUsedElems,
+ state.getTypeHandler(),
+ state.getTypeId());
if (!_initializing) {
_genHolder.hold(std::move(hold));
}
@@ -452,7 +444,7 @@ DataStoreBase::markCompacting(uint32_t bufferId)
assert(!state.getCompacting());
state.setCompacting();
state.disableElemHoldList();
- state.free_list().disable();
+ state.disable_free_list();
inc_compaction_count();
}
@@ -460,9 +452,15 @@ std::unique_ptr<CompactingBuffers>
DataStoreBase::start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy)
{
// compact memory usage
- CompactBufferCandidates elem_buffers(_numBuffers, compaction_strategy.get_max_buffers(), compaction_strategy.get_active_buffers_ratio(), compaction_strategy.getMaxDeadBytesRatio() / 2, CompactionStrategy::DEAD_BYTES_SLACK);
+ CompactBufferCandidates elem_buffers(_numBuffers, compaction_strategy.get_max_buffers(),
+ compaction_strategy.get_active_buffers_ratio(),
+ compaction_strategy.getMaxDeadBytesRatio() / 2,
+ CompactionStrategy::DEAD_BYTES_SLACK);
// compact address space
- CompactBufferCandidates array_buffers(_numBuffers, compaction_strategy.get_max_buffers(), compaction_strategy.get_active_buffers_ratio(), compaction_strategy.getMaxDeadAddressSpaceRatio() / 2, CompactionStrategy::DEAD_ADDRESS_SPACE_SLACK);
+ CompactBufferCandidates array_buffers(_numBuffers, compaction_strategy.get_max_buffers(),
+ compaction_strategy.get_active_buffers_ratio(),
+ compaction_strategy.getMaxDeadAddressSpaceRatio() / 2,
+ CompactionStrategy::DEAD_ADDRESS_SPACE_SLACK);
uint32_t free_buffers = 0;
for (uint32_t bufferId = 0; bufferId < _numBuffers; ++bufferId) {
const auto &state = getBufferState(bufferId);
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
index 4038e12efee..28c47207a9f 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
@@ -25,9 +25,10 @@ class CompactionStrategy;
*/
class DataStoreBase
{
-public:
+protected:
/**
- * Hold list before freeze, before knowing how long elements must be held.
+ * Hold list element used in the first phase of holding (before freeze),
+ * before knowing how long elements must be held.
*/
class ElemHold1ListElem
{
@@ -41,10 +42,27 @@ public:
{ }
};
-protected:
using generation_t = vespalib::GenerationHandler::generation_t;
using sgeneration_t = vespalib::GenerationHandler::sgeneration_t;
+ /**
+ * Hold list element used in the second phase of holding (at freeze),
+ * when knowing how long elements must be held.
+ */
+ class ElemHold2ListElem : public ElemHold1ListElem
+ {
+ public:
+ generation_t _generation;
+
+ ElemHold2ListElem(const ElemHold1ListElem &hold1, generation_t generation)
+ : ElemHold1ListElem(hold1),
+ _generation(generation)
+ { }
+ };
+
+ using ElemHold1List = vespalib::Array<ElemHold1ListElem>;
+ using ElemHold2List = std::deque<ElemHold2ListElem>;
+
private:
class BufferAndTypeId {
public:
@@ -60,32 +78,16 @@ private:
uint32_t _typeId;
};
std::vector<BufferAndTypeId> _buffers; // For fast mapping with known types
-protected:
+
// Provides a mapping from typeId -> primary buffer for that type.
// The primary buffer is used for allocations of new element(s) if no available slots are found in free lists.
std::vector<uint32_t> _primary_buffer_ids;
+protected:
void* getBuffer(uint32_t bufferId) { return _buffers[bufferId].get_buffer_relaxed(); }
/**
- * Hold list at freeze, when knowing how long elements must be held
- */
- class ElemHold2ListElem : public ElemHold1ListElem
- {
- public:
- generation_t _generation;
-
- ElemHold2ListElem(const ElemHold1ListElem &hold1, generation_t generation)
- : ElemHold1ListElem(hold1),
- _generation(generation)
- { }
- };
-
- using ElemHold1List = vespalib::Array<ElemHold1ListElem>;
- using ElemHold2List = std::deque<ElemHold2ListElem>;
-
- /**
- * Class used to hold the old buffer as part of fallbackResize().
+ * Class used to hold the entire old buffer as part of fallbackResize().
*/
class FallbackHold : public vespalib::GenerationHeldBase
{
@@ -129,6 +131,7 @@ protected:
virtual ~DataStoreBase();
+private:
/**
* Get the next buffer id after the given buffer id.
*/
@@ -138,6 +141,7 @@ protected:
ret = 0;
return ret;
}
+protected:
/**
* Get the primary buffer for the given type id.
@@ -156,6 +160,7 @@ protected:
virtual void clearElemHoldList() = 0;
void markCompacting(uint32_t bufferId);
+
public:
uint32_t addType(BufferTypeBase *typeHandler);
void init_primary_buffers();
@@ -191,9 +196,11 @@ public:
*/
void switch_primary_buffer(uint32_t typeId, size_t elemsNeeded);
+private:
bool consider_grow_active_buffer(uint32_t type_id, size_t elems_needed);
void switch_or_grow_primary_buffer(uint32_t typeId, size_t elemsNeeded);
+public:
vespalib::MemoryUsage getMemoryUsage() const;
vespalib::AddressSpace getAddressSpaceUsage() const;
@@ -205,6 +212,8 @@ public:
const BufferState &getBufferState(uint32_t bufferId) const { return _states[bufferId]; }
BufferState &getBufferState(uint32_t bufferId) { return _states[bufferId]; }
uint32_t getNumBuffers() const { return _numBuffers; }
+
+private:
bool hasElemHold1() const { return !_elemHold1List.empty(); }
/**
@@ -212,16 +221,19 @@ public:
*/
void transferElemHoldList(generation_t generation);
+public:
/**
* Transfer holds from hold1 to hold2 lists, assigning generation.
*/
void transferHoldLists(generation_t generation);
+private:
/**
* Hold of buffer has ended.
*/
void doneHoldBuffer(uint32_t bufferId);
+public:
/**
* Trim hold lists, freeing buffers that no longer needs to be held.
*
@@ -253,12 +265,6 @@ public:
void dropBuffers();
-
- void incDead(uint32_t bufferId, size_t deadElems) {
- BufferState &state = _states[bufferId];
- state.stats().inc_dead_elems(deadElems);
- }
-
/**
* Enable free list management.
* This only works for fixed size elements.
@@ -270,16 +276,14 @@ public:
*/
void disableFreeLists();
+private:
/**
* Enable free list management.
* This only works for fixed size elements.
*/
void enableFreeList(uint32_t bufferId);
- /**
- * Disable free list management.
- */
- void disableFreeList(uint32_t bufferId);
+public:
void disableElemHoldList();
bool has_free_lists_enabled() const { return _freeListsEnabled; }
@@ -312,14 +316,18 @@ private:
void onActive(uint32_t bufferId, uint32_t typeId, size_t elemsNeeded);
void inc_hold_buffer_count();
+
public:
uint32_t getTypeId(uint32_t bufferId) const {
return _buffers[bufferId].getTypeId();
}
void finishCompact(const std::vector<uint32_t> &toHold);
+
+private:
void fallbackResize(uint32_t bufferId, size_t elementsNeeded);
+public:
vespalib::GenerationHolder &getGenerationHolder() {
return _genHolder;
}
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
new file mode 100644
index 00000000000..e03e0b07944
--- /dev/null
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This RequestProcessor logs requests to disk. It batches the requests to do
+ * the io efficiently. The request is not passed to the next RequestProcessor
+ * until its log has been synced to disk.
+ *
+ * SyncRequestProcessor is used in 3 different cases
+ * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
+ * send ack back to itself.
+ * 2. Follower - Sync request to disk and forward request to
+ * SendAckRequestProcessor which send the packets to leader.
+ * SendAckRequestProcessor is flushable which allow us to force
+ * push packets to leader.
+ * 3. Observer - Sync committed request to disk (received as INFORM packet).
+ * It never send ack back to the leader, so the nextProcessor will
+ * be null. This change the semantic of txnlog on the observer
+ * since it only contains committed txns.
+ */
+public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
+
+ private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
+
+ private static class FlushRequest extends Request {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ public FlushRequest() {
+ super(null, 0, 0, 0, null, null);
+ }
+ }
+
+ private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
+ private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);
+
+ private static class DelayingProcessor implements RequestProcessor, Flushable {
+ private final RequestProcessor next;
+ private Queue<Request> delayed = null;
+ private DelayingProcessor(RequestProcessor next) {
+ this.next = next;
+ }
+ @Override
+ public void flush() throws IOException {
+ if (delayed == null && next instanceof Flushable) {
+ ((Flushable) next).flush();
+ }
+ }
+ @Override
+ public void processRequest(Request request) throws RequestProcessorException {
+ if (delayed == null) {
+ next.processRequest(request);
+ } else {
+ delayed.add(request);
+ }
+ }
+ @Override
+ public void shutdown() {
+ next.shutdown();
+ }
+ private void close() {
+ if (delayed == null) {
+ delayed = new ArrayDeque<>();
+ }
+ }
+ private void open() throws RequestProcessorException {
+ if (delayed != null) {
+ for (Request request : delayed) {
+ next.processRequest(request);
+ }
+ delayed = null;
+ }
+ }
+ }
+
+ /** The number of log entries to log before starting a snapshot */
+ private static int snapCount = ZooKeeperServer.getSnapCount();
+
+ /**
+ * The total size of log entries before starting a snapshot
+ */
+ private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
+
+ /**
+ * Random numbers used to vary snapshot timing
+ */
+ private int randRoll;
+ private long randSize;
+
+ private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+
+ private final Semaphore snapThreadMutex = new Semaphore(1);
+
+ private final ZooKeeperServer zks;
+
+ private final DelayingProcessor nextProcessor;
+
+ /**
+ * Transactions that have been written and are waiting to be flushed to
+ * disk. Basically this is the list of SyncItems whose callbacks will be
+ * invoked after flush returns successfully.
+ */
+ private final Queue<Request> toFlush;
+ private long lastFlushTime;
+
+ public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+ super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
+ this.zks = zks;
+ this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
+ this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
+ }
+
+ /**
+ * used by tests to check for changing
+ * snapcounts
+ * @param count
+ */
+ public static void setSnapCount(int count) {
+ snapCount = count;
+ }
+
+ /**
+ * used by tests to get the snapcount
+ * @return the snapcount
+ */
+ public static int getSnapCount() {
+ return snapCount;
+ }
+
+ private long getRemainingDelay() {
+ long flushDelay = zks.getFlushDelay();
+ long duration = Time.currentElapsedTime() - lastFlushTime;
+ if (duration < flushDelay) {
+ return flushDelay - duration;
+ }
+ return 0;
+ }
+
+ /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
+ * whenever either condition is hit. If only one or the other is
+ * set, flush only when the relevant condition is hit.
+ */
+ private boolean shouldFlush() {
+ long flushDelay = zks.getFlushDelay();
+ long maxBatchSize = zks.getMaxBatchSize();
+ if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
+ return true;
+ }
+ return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
+ }
+
+ /**
+ * used by tests to check for changing
+ * snapcounts
+ * @param size
+ */
+ public static void setSnapSizeInBytes(long size) {
+ snapSizeInBytes = size;
+ }
+
+ private boolean shouldSnapshot() {
+ int logCount = zks.getZKDatabase().getTxnCount();
+ long logSize = zks.getZKDatabase().getTxnSize();
+ return (logCount > (snapCount / 2 + randRoll))
+ || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
+ }
+
+ private void resetSnapshotStats() {
+ randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
+ randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
+ }
+
+ @Override
+ public void run() {
+ try {
+ // we do this in an attempt to ensure that not all of the servers
+ // in the ensemble take a snapshot at the same time
+ resetSnapshotStats();
+ lastFlushTime = Time.currentElapsedTime();
+ while (true) {
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
+
+ long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
+ Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
+ if (si == null) {
+ /* We timed out looking for more writes to batch, go ahead and flush immediately */
+ flush();
+ si = queuedRequests.take();
+ }
+
+ if (si == REQUEST_OF_DEATH) {
+ break;
+ }
+
+ if (si == turnForwardingDelayOn) {
+ nextProcessor.close();
+ continue;
+ }
+ if (si == turnForwardingDelayOff) {
+ nextProcessor.open();
+ continue;
+ }
+
+ if (si instanceof FlushRequest) {
+ flush();
+ ((FlushRequest) si).latch.countDown();
+ continue;
+ }
+
+ long startProcessTime = Time.currentElapsedTime();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
+
+ // track the number of records written to the log
+ if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
+ if (shouldSnapshot()) {
+ resetSnapshotStats();
+ // roll the log
+ zks.getZKDatabase().rollLog();
+ // take a snapshot
+ if (!snapThreadMutex.tryAcquire()) {
+ LOG.warn("Too busy to snap, skipping");
+ } else {
+ new ZooKeeperThread("Snapshot Thread") {
+ public void run() {
+ try {
+ zks.takeSnapshot();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ snapThreadMutex.release();
+ }
+ }
+ }.start();
+ }
+ }
+ } else if (toFlush.isEmpty()) {
+ // optimization for read heavy workloads
+ // iff this is a read or a throttled request(which doesn't need to be written to the disk),
+ // and there are no pending flushes (writes), then just pass this to the next processor
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(si);
+ nextProcessor.flush();
+ }
+ continue;
+ }
+ toFlush.add(si);
+ if (shouldFlush()) {
+ flush();
+ }
+ ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
+ }
+ } catch (Throwable t) {
+ handleException(this.getName(), t);
+ }
+ LOG.info("SyncRequestProcessor exited!");
+ }
+
+ /** Flushes all pending writes, and waits for this to complete. */
+ public void syncFlush() throws InterruptedException {
+ FlushRequest marker = new FlushRequest();
+ queuedRequests.add(marker);
+ marker.latch.await();
+ }
+
+ public void setDelayForwarding(boolean delayForwarding) {
+ queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
+ }
+
+ private void flush() throws IOException, RequestProcessorException {
+ if (this.toFlush.isEmpty()) {
+ return;
+ }
+
+ ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
+
+ long flushStartTime = Time.currentElapsedTime();
+ zks.getZKDatabase().commit();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
+
+ if (this.nextProcessor == null) {
+ this.toFlush.clear();
+ } else {
+ while (!this.toFlush.isEmpty()) {
+ final Request i = this.toFlush.remove();
+ long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
+ this.nextProcessor.processRequest(i);
+ }
+ nextProcessor.flush();
+ }
+ lastFlushTime = Time.currentElapsedTime();
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down");
+ queuedRequests.add(REQUEST_OF_DEATH);
+ try {
+ this.join();
+ this.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while wating for {} to finish", this);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ LOG.warn("Got IO exception during shutdown");
+ } catch (RequestProcessorException e) {
+ LOG.warn("Got request processor exception during shutdown");
+ }
+ if (nextProcessor != null) {
+ nextProcessor.shutdown();
+ }
+ }
+
+ public void processRequest(final Request request) {
+ Objects.requireNonNull(request, "Request cannot be null");
+
+ request.syncQueueStartTime = Time.currentElapsedTime();
+ queuedRequests.add(request);
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
new file mode 100644
index 00000000000..8e80fae57dc
--- /dev/null
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLSocket;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.server.ExitCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TxnLogEntry;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ConfigUtils;
+import org.apache.zookeeper.server.util.MessageTracker;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.util.ServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the superclass of two of the three main actors in a ZK
+ * ensemble: Followers and Observers. Both Followers and Observers share
+ * a good deal of code which is moved into Peer to avoid duplication.
+ */
+public class Learner {
+
+ static class PacketInFlight {
+
+ TxnHeader hdr;
+ Record rec;
+ TxnDigest digest;
+
+ }
+
+ QuorumPeer self;
+ LearnerZooKeeperServer zk;
+
+ protected BufferedOutputStream bufferedOutput;
+
+ protected Socket sock;
+ protected MultipleAddresses leaderAddr;
+ protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
+
+ /**
+ * Socket getter
+ */
+ public Socket getSocket() {
+ return sock;
+ }
+
+ LearnerSender sender = null;
+ protected InputArchive leaderIs;
+ protected OutputArchive leaderOs;
+ /** the protocol version of the leader */
+ protected int leaderProtocolVersion = 0x01;
+
+ private static final int BUFFERED_MESSAGE_SIZE = 10;
+ protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+ protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
+
+ /**
+ * Time to wait after connection attempt with the Leader or LearnerMaster before this
+ * Learner tries to connect again.
+ */
+ private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
+
+ private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
+
+ public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
+ private static boolean asyncSending =
+ Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
+ public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
+ public static final boolean closeSocketAsync = Boolean
+ .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC));
+
+ static {
+ LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
+ LOG.info("TCP NoDelay set to: {}", nodelay);
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+ LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
+ }
+
+ final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
+
+ public int getPendingRevalidationsCount() {
+ return pendingRevalidations.size();
+ }
+
+ // for testing
+ protected static void setAsyncSending(boolean newMode) {
+ asyncSending = newMode;
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+
+ }
+ protected static boolean getAsyncSending() {
+ return asyncSending;
+ }
+ /**
+ * validate a session for a client
+ *
+ * @param clientId
+ * the client to be revalidated
+ * @param timeout
+ * the timeout for which the session is valid
+ * @throws IOException
+ */
+ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
+ LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeLong(clientId);
+ dos.writeInt(timeout);
+ dos.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
+ pendingRevalidations.put(clientId, cnxn);
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(
+ LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "To validate session 0x" + Long.toHexString(clientId));
+ }
+ writePacket(qp, true);
+ }
+
+ /**
+ * write a packet to the leader.
+ *
+ * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
+ * When packets are sent synchronously, writing is done within a synchronization block.
+ * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
+ * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
+ * So we have only one thread writing to leaderOs at a time in either case.
+ *
+ * @param pp
+ * the proposal packet to be sent to the leader
+ * @throws IOException
+ */
+ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+ if (asyncSending) {
+ sender.queuePacket(pp);
+ } else {
+ writePacketNow(pp, flush);
+ }
+ }
+
+ void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
+ synchronized (leaderOs) {
+ if (pp != null) {
+ messageTracker.trackSent(pp.getType());
+ leaderOs.writeRecord(pp, "packet");
+ }
+ if (flush) {
+ bufferedOutput.flush();
+ }
+ }
+ }
+
+ /**
+ * Start thread that will forward any packet in the queue to the leader
+ */
+ protected void startSendingThread() {
+ sender = new LearnerSender(this);
+ sender.start();
+ }
+
+ /**
+ * read a packet from the leader
+ *
+ * @param pp
+ * the packet to be instantiated
+ * @throws IOException
+ */
+ void readPacket(QuorumPacket pp) throws IOException {
+ synchronized (leaderIs) {
+ leaderIs.readRecord(pp, "packet");
+ messageTracker.trackReceived(pp.getType());
+ }
+ if (LOG.isTraceEnabled()) {
+ final long traceMask =
+ (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
+ : ZooTrace.SERVER_PACKET_TRACE_MASK;
+
+ ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
+ }
+ }
+
+ /**
+ * send a request packet to the leader
+ *
+ * @param request
+ * the request from the client
+ * @throws IOException
+ */
+ void request(Request request) throws IOException {
+ if (request.isThrottled()) {
+ LOG.error("Throttled request sent to leader: {}. Exiting", request);
+ ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream oa = new DataOutputStream(baos);
+ oa.writeLong(request.sessionId);
+ oa.writeInt(request.cxid);
+ oa.writeInt(request.type);
+ if (request.request != null) {
+ request.request.rewind();
+ int len = request.request.remaining();
+ byte[] b = new byte[len];
+ request.request.get(b);
+ request.request.rewind();
+ oa.write(b);
+ }
+ oa.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
+ writePacket(qp, true);
+ }
+
+ /**
+ * Returns the address of the node we think is the leader.
+ */
+ protected QuorumServer findLeader() {
+ QuorumServer leaderServer = null;
+ // Find the leader by id
+ Vote current = self.getCurrentVote();
+ for (QuorumServer s : self.getView().values()) {
+ if (s.id == current.getId()) {
+ // Ensure we have the leader's correct IP address before
+ // attempting to connect.
+ s.recreateSocketAddresses();
+ leaderServer = s;
+ break;
+ }
+ }
+ if (leaderServer == null) {
+ LOG.warn("Couldn't find the leader with id = {}", current.getId());
+ }
+ return leaderServer;
+ }
+
+ /**
+ * Overridable helper method to return the System.nanoTime().
+ * This method behaves identical to System.nanoTime().
+ */
+ protected long nanoTime() {
+ return System.nanoTime();
+ }
+
+ /**
+ * Overridable helper method to simply call sock.connect(). This can be
+ * overriden in tests to fake connection success/failure for connectToLeader.
+ */
+ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
+ sock.connect(addr, timeout);
+ }
+
+ /**
+ * Establish a connection with the LearnerMaster found by findLearnerMaster.
+ * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
+ * Retries until either initLimit time has elapsed or 5 tries have happened.
+ * @param multiAddr - the address of the Peer to connect to.
+ * @throws IOException - if the socket connection fails on the 5th attempt
+ * if there is an authentication failure while connecting to leader
+ */
+ protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
+
+ this.leaderAddr = multiAddr;
+ Set<InetSocketAddress> addresses;
+ if (self.isMultiAddressReachabilityCheckEnabled()) {
+ // even if none of the addresses are reachable, we want to try to establish connection
+ // see ZOOKEEPER-3758
+ addresses = multiAddr.getAllReachableAddressesOrAll();
+ } else {
+ addresses = multiAddr.getAllAddresses();
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
+ CountDownLatch latch = new CountDownLatch(addresses.size());
+ AtomicReference<Socket> socket = new AtomicReference<>(null);
+ addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while trying to connect to Leader", e);
+ } finally {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ LOG.error("not all the LeaderConnector terminated properly");
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
+ }
+ }
+
+ if (socket.get() == null) {
+ throw new IOException("Failed connect to " + multiAddr);
+ } else {
+ sock = socket.get();
+ sockBeingClosed.set(false);
+ }
+
+ self.authLearner.authenticate(sock, hostname);
+
+ leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
+ bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+ leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+ if (asyncSending) {
+ startSendingThread();
+ }
+ }
+
+ class LeaderConnector implements Runnable {
+
+ private AtomicReference<Socket> socket;
+ private InetSocketAddress address;
+ private CountDownLatch latch;
+
+ LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) {
+ this.address = address;
+ this.socket = socket;
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.currentThread().setName("LeaderConnector-" + address);
+ Socket sock = connectToLeader();
+
+ if (sock != null && sock.isConnected()) {
+ if (socket.compareAndSet(null, sock)) {
+ LOG.info("Successfully connected to leader, using address: {}", address);
+ } else {
+ LOG.info("Connection to the leader is already established, close the redundant connection");
+ sock.close();
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failed connect to {}", address, e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
+ Socket sock = createSocket();
+
+ // leader connection timeout defaults to tickTime * initLimit
+ int connectTimeout = self.tickTime * self.initLimit;
+
+ // but if connectToLearnerMasterLimit is specified, use that value to calculate
+ // timeout instead of using the initLimit value
+ if (self.connectToLearnerMasterLimit > 0) {
+ connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
+ }
+
+ int remainingTimeout;
+ long startNanoTime = nanoTime();
+
+ for (int tries = 0; tries < 5 && socket.get() == null; tries++) {
+ try {
+ // recalculate the init limit time because retries sleep for 1000 milliseconds
+ remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
+ if (remainingTimeout <= 0) {
+ LOG.error("connectToLeader exceeded on retries.");
+ throw new IOException("connectToLeader exceeded on retries.");
+ }
+
+ sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout));
+ if (self.isSslQuorum()) {
+ ((SSLSocket) sock).startHandshake();
+ }
+ sock.setTcpNoDelay(nodelay);
+ break;
+ } catch (IOException e) {
+ remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
+
+ if (remainingTimeout <= leaderConnectDelayDuringRetryMs) {
+ LOG.error(
+ "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}",
+ tries,
+ remainingTimeout,
+ address,
+ e);
+ throw e;
+ } else if (tries >= 4) {
+ LOG.error(
+ "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}",
+ tries,
+ remainingTimeout,
+ address,
+ e);
+ throw e;
+ } else {
+ LOG.warn(
+ "Unexpected exception, tries={}, remaining init limit={}, connecting to {}",
+ tries,
+ remainingTimeout,
+ address,
+ e);
+ sock = createSocket();
+ }
+ }
+ Thread.sleep(leaderConnectDelayDuringRetryMs);
+ }
+
+ return sock;
+ }
+ }
+
+ /**
+ * Creating a simple or and SSL socket.
+ * This can be overridden in tests to fake already connected sockets for connectToLeader.
+ */
+ protected Socket createSocket() throws X509Exception, IOException {
+ Socket sock;
+ if (self.isSslQuorum()) {
+ sock = self.getX509Util().createSSLSocket();
+ } else {
+ sock = new Socket();
+ }
+ sock.setSoTimeout(self.tickTime * self.initLimit);
+ return sock;
+ }
+
+ /**
+ * Once connected to the leader or learner master, perform the handshake
+ * protocol to establish a following / observing connection.
+ * @param pktType
+ * @return the zxid the Leader sends for synchronization purposes.
+ * @throws IOException
+ */
+ protected long registerWithLeader(int pktType) throws IOException {
+ /*
+ * Send follower info, including last zxid and sid
+ */
+ long lastLoggedZxid = self.getLastLoggedZxid();
+ QuorumPacket qp = new QuorumPacket();
+ qp.setType(pktType);
+ qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
+
+ /*
+ * Add sid to payload
+ */
+ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
+ ByteArrayOutputStream bsid = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
+ boa.writeRecord(li, "LearnerInfo");
+ qp.setData(bsid.toByteArray());
+
+ writePacket(qp, true);
+ readPacket(qp);
+ final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
+ if (qp.getType() == Leader.LEADERINFO) {
+ // we are connected to a 1.0 server so accept the new epoch and read the next packet
+ leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
+ byte[] epochBytes = new byte[4];
+ final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+ if (newEpoch > self.getAcceptedEpoch()) {
+ wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
+ self.setAcceptedEpoch(newEpoch);
+ } else if (newEpoch == self.getAcceptedEpoch()) {
+ // since we have already acked an epoch equal to the leaders, we cannot ack
+ // again, but we still need to send our lastZxid to the leader so that we can
+ // sync with it if it does assume leadership of the epoch.
+ // the -1 indicates that this reply should not count as an ack for the new epoch
+ wrappedEpochBytes.putInt(-1);
+ } else {
+ throw new IOException("Leaders epoch, "
+ + newEpoch
+ + " is less than accepted epoch, "
+ + self.getAcceptedEpoch());
+ }
+ QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
+ writePacket(ackNewEpoch, true);
+ return ZxidUtils.makeZxid(newEpoch, 0);
+ } else {
+ if (newEpoch > self.getAcceptedEpoch()) {
+ self.setAcceptedEpoch(newEpoch);
+ }
+ if (qp.getType() != Leader.NEWLEADER) {
+ LOG.error("First packet should have been NEWLEADER");
+ throw new IOException("First packet should have been NEWLEADER");
+ }
+ return qp.getZxid();
+ }
+ }
+
+ /**
+ * Finally, synchronize our history with the Leader (if Follower)
+ * or the LearnerMaster (if Observer).
+ * @param newLeaderZxid
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected void syncWithLeader(long newLeaderZxid) throws Exception {
+ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+ QuorumPacket qp = new QuorumPacket();
+ long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
+
+ QuorumVerifier newLeaderQV = null;
+
+ // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
+ // For SNAP and TRUNC the snapshot is needed to save that history
+ boolean snapshotNeeded = true;
+ boolean syncSnapshot = false;
+ readPacket(qp);
+ Deque<Long> packetsCommitted = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
+ synchronized (zk) {
+ if (qp.getType() == Leader.DIFF) {
+ LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+ self.setSyncMode(QuorumPeer.SyncMode.DIFF);
+ if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
+ LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
+ snapshotNeeded = true;
+ syncSnapshot = true;
+ } else {
+ snapshotNeeded = false;
+ }
+ } else if (qp.getType() == Leader.SNAP) {
+ self.setSyncMode(QuorumPeer.SyncMode.SNAP);
+ LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
+ // The leader is going to dump the database
+ // db is clear as part of deserializeSnapshot()
+ zk.getZKDatabase().deserializeSnapshot(leaderIs);
+ // ZOOKEEPER-2819: overwrite config node content extracted
+ // from leader snapshot with local config, to avoid potential
+ // inconsistency of config node content during rolling restart.
+ if (!self.isReconfigEnabled()) {
+ LOG.debug("Reset config node content from local config after deserialization of snapshot.");
+ zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
+ }
+ String signature = leaderIs.readString("signature");
+ if (!signature.equals("BenWasHere")) {
+ LOG.error("Missing signature. Got {}", signature);
+ throw new IOException("Missing signature");
+ }
+ zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+ // immediately persist the latest snapshot when there is txn log gap
+ syncSnapshot = true;
+ } else if (qp.getType() == Leader.TRUNC) {
+ //we need to truncate the log to the lastzxid of the leader
+ self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
+ LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
+ boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
+ if (!truncated) {
+ // not able to truncate the log
+ LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
+ ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
+ }
+ zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+ } else {
+ LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
+ ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
+ }
+ zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
+ zk.createSessionTracker();
+
+ long lastQueued = 0;
+
+ // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
+ // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
+ // we need to make sure that we don't take the snapshot twice.
+ boolean isPreZAB1_0 = true;
+ //If we are not going to take the snapshot be sure the transactions are not applied in memory
+ // but written out to the transaction log
+ boolean writeToTxnLog = !snapshotNeeded;
+ TxnLogEntry logEntry;
+ // we are now going to start getting transactions to apply followed by an UPTODATE
+ outerLoop:
+ while (self.isRunning()) {
+ readPacket(qp);
+ switch (qp.getType()) {
+ case Leader.PROPOSAL:
+ PacketInFlight pif = new PacketInFlight();
+ logEntry = SerializeUtils.deserializeTxn(qp.getData());
+ pif.hdr = logEntry.getHeader();
+ pif.rec = logEntry.getTxn();
+ pif.digest = logEntry.getDigest();
+ if (pif.hdr.getZxid() != lastQueued + 1) {
+ LOG.warn(
+ "Got zxid 0x{} expected 0x{}",
+ Long.toHexString(pif.hdr.getZxid()),
+ Long.toHexString(lastQueued + 1));
+ }
+ lastQueued = pif.hdr.getZxid();
+
+ if (pif.hdr.getType() == OpCode.reconfig) {
+ SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
+ QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
+ self.setLastSeenQuorumVerifier(qv, true);
+ }
+
+ packetsNotLogged.add(pif);
+ packetsNotCommitted.add(pif);
+ break;
+ case Leader.COMMIT:
+ case Leader.COMMITANDACTIVATE:
+ pif = packetsNotCommitted.peekFirst();
+ if (pif.hdr.getZxid() != qp.getZxid()) {
+ LOG.warn(
+ "Committing 0x{}, but next proposal is 0x{}",
+ Long.toHexString(qp.getZxid()),
+ Long.toHexString(pif.hdr.getZxid()));
+ } else {
+ if (qp.getType() == Leader.COMMITANDACTIVATE) {
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(
+ qv,
+ ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
+ true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
+ if (!writeToTxnLog) {
+ zk.processTxn(pif.hdr, pif.rec);
+ packetsNotLogged.remove();
+ packetsNotCommitted.remove();
+ } else {
+ packetsNotCommitted.remove();
+ packetsCommitted.add(qp.getZxid());
+ }
+ }
+ break;
+ case Leader.INFORM:
+ case Leader.INFORMANDACTIVATE:
+ PacketInFlight packet = new PacketInFlight();
+
+ if (qp.getType() == Leader.INFORMANDACTIVATE) {
+ ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
+ long suggestedLeaderId = buffer.getLong();
+ byte[] remainingdata = new byte[buffer.remaining()];
+ buffer.get(remainingdata);
+ logEntry = SerializeUtils.deserializeTxn(remainingdata);
+ packet.hdr = logEntry.getHeader();
+ packet.rec = logEntry.getTxn();
+ packet.digest = logEntry.getDigest();
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ } else {
+ logEntry = SerializeUtils.deserializeTxn(qp.getData());
+ packet.rec = logEntry.getTxn();
+ packet.hdr = logEntry.getHeader();
+ packet.digest = logEntry.getDigest();
+ // Log warning message if txn comes out-of-order
+ if (packet.hdr.getZxid() != lastQueued + 1) {
+ LOG.warn(
+ "Got zxid 0x{} expected 0x{}",
+ Long.toHexString(packet.hdr.getZxid()),
+ Long.toHexString(lastQueued + 1));
+ }
+ lastQueued = packet.hdr.getZxid();
+ }
+ if (!writeToTxnLog) {
+ // Apply to db directly if we haven't taken the snapshot
+ zk.processTxn(packet.hdr, packet.rec);
+ } else {
+ packetsNotLogged.add(packet);
+ packetsCommitted.add(qp.getZxid());
+ }
+
+ break;
+ case Leader.UPTODATE:
+ LOG.info("Learner received UPTODATE message");
+ if (newLeaderQV != null) {
+ boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
+ if (isPreZAB1_0) {
+ zk.takeSnapshot(syncSnapshot);
+ self.setCurrentEpoch(newEpoch);
+ }
+ self.setZooKeeperServer(zk);
+ self.adminServer.setZooKeeperServer(zk);
+ break outerLoop;
+ case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
+ // means this is Zab 1.0
+ LOG.info("Learner received NEWLEADER message");
+ if (qp.getData() != null && qp.getData().length > 1) {
+ try {
+ QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8));
+ self.setLastSeenQuorumVerifier(qv, true);
+ newLeaderQV = qv;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ if (snapshotNeeded) {
+ zk.takeSnapshot(syncSnapshot);
+ }
+
+ self.setCurrentEpoch(newEpoch);
+ writeToTxnLog = true;
+ //Anything after this needs to go to the transaction log, not applied directly in memory
+ isPreZAB1_0 = false;
+
+ // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
+ zk.startupWithoutServing();
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(true);
+ for (PacketInFlight p : packetsNotLogged) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ packetsNotLogged.clear();
+ fzk.syncProcessor.syncFlush();
+ }
+
+ writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
+
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(false);
+ fzk.syncProcessor.syncFlush();
+ }
+ break;
+ }
+ }
+ }
+ ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
+ writePacket(ack, true);
+ zk.startServing();
+ /*
+ * Update the election vote here to ensure that all members of the
+ * ensemble report the same vote to new servers that start up and
+ * send leader election notifications to the ensemble.
+ *
+ * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
+ */
+ self.updateElectionVote(newEpoch);
+
+ // We need to log the stuff that came in between the snapshot and the uptodate
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotLogged) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ for (Long zxid : packetsCommitted) {
+ fzk.commit(zxid);
+ }
+ } else if (zk instanceof ObserverZooKeeperServer) {
+ // Similar to follower, we need to log requests between the snapshot
+ // and UPTODATE
+ ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotLogged) {
+ Long zxid = packetsCommitted.peekFirst();
+ if (p.hdr.getZxid() != zxid) {
+ // log warning message if there is no matching commit
+ // old leader send outstanding proposal to observer
+ LOG.warn(
+ "Committing 0x{}, but next proposal is 0x{}",
+ Long.toHexString(zxid),
+ Long.toHexString(p.hdr.getZxid()));
+ continue;
+ }
+ packetsCommitted.remove();
+ Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
+ request.setTxn(p.rec);
+ request.setHdr(p.hdr);
+ request.setTxnDigest(p.digest);
+ ozk.commitRequest(request);
+ }
+ } else {
+ // New server type need to handle in-flight packets
+ throw new UnsupportedOperationException("Unknown server type");
+ }
+ }
+
+ protected void revalidate(QuorumPacket qp) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ long sessionId = dis.readLong();
+ boolean valid = dis.readBoolean();
+ ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
+ if (cnxn == null) {
+ LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId));
+ } else {
+ zk.finishSessionInit(cnxn, valid);
+ }
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(
+ LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid);
+ }
+ }
+
+ protected void ping(QuorumPacket qp) throws IOException {
+ // Send back the ping with our session data
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ Map<Long, Integer> touchTable = zk.getTouchSnapshot();
+ for (Entry<Long, Integer> entry : touchTable.entrySet()) {
+ dos.writeLong(entry.getKey());
+ dos.writeInt(entry.getValue());
+ }
+
+ QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
+ writePacket(pingReply, true);
+ }
+
+ /**
+ * Shutdown the Peer
+ */
+ public void shutdown() {
+ self.setZooKeeperServer(null);
+ self.closeAllConnections();
+ self.adminServer.setZooKeeperServer(null);
+
+ if (sender != null) {
+ sender.shutdown();
+ }
+
+ closeSocket();
+ // shutdown previous zookeeper
+ if (zk != null) {
+ // If we haven't finished SNAP sync, force fully shutdown
+ // to avoid potential inconsistency
+ zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
+ }
+ }
+
+ boolean isRunning() {
+ return self.isRunning() && zk.isRunning();
+ }
+
+ void closeSocket() {
+ if (sock != null) {
+ if (sockBeingClosed.compareAndSet(false, true)) {
+ if (closeSocketAsync) {
+ final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
+ closingThread.setDaemon(true);
+ closingThread.start();
+ } else {
+ closeSockSync();
+ }
+ }
+ }
+ }
+
+ void closeSockSync() {
+ try {
+ long startTime = Time.currentElapsedTime();
+ if (sock != null) {
+ sock.close();
+ sock = null;
+ }
+ ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
+ } catch (IOException e) {
+ LOG.warn("Ignoring error closing connection to leader", e);
+ }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
index c1399e53083..3b7a9dfc331 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -67,7 +67,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
Socket socket = learner.sock;
- if ( socket != null && ! learner.sock.isClosed()) {
+ if (socket != null && !socket.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {