diff options
40 files changed, 1780 insertions, 752 deletions
diff --git a/client/go/cmd/logfmt/cmd.go b/client/go/cmd/logfmt/cmd.go index b549c5fa7ef..84322c7ff08 100644 --- a/client/go/cmd/logfmt/cmd.go +++ b/client/go/cmd/logfmt/cmd.go @@ -38,7 +38,7 @@ and converts it to something human-readable`, cmd.Flags().StringVarP(&curOptions.OnlyHostname, "host", "H", "", "select only one host") cmd.Flags().StringVarP(&curOptions.OnlyPid, "pid", "p", "", "select only one process ID") cmd.Flags().StringVarP(&curOptions.OnlyService, "service", "S", "", "select only one service") - cmd.Flags().VarP(&curOptions.Format, "format", "F", "select logfmt output format, vespa (default), json or raw are supported") + cmd.Flags().VarP(&curOptions.Format, "format", "F", "select logfmt output format, vespa (default), json or raw are supported. The json output format is not stable, and will change in the future.") cmd.Flags().MarkHidden("tc") cmd.Flags().MarkHidden("ts") cmd.Flags().MarkHidden("dequotenewlines") diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index 5985e79b786..d324656abf2 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -1051,8 +1051,6 @@ "public com.yahoo.jdisc.http.ConnectorConfig$Builder healthCheckProxy(java.util.function.Consumer)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder proxyProtocol(com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder proxyProtocol(java.util.function.Consumer)", - "public com.yahoo.jdisc.http.ConnectorConfig$Builder secureRedirect(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)", - "public com.yahoo.jdisc.http.ConnectorConfig$Builder secureRedirect(java.util.function.Consumer)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder maxRequestsPerConnection(int)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder maxConnectionLife(double)", "public com.yahoo.jdisc.http.ConnectorConfig$Builder http2Enabled(boolean)", @@ -1074,7 +1072,6 @@ "public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer$Builder tlsClientAuthEnforcer", "public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy$Builder healthCheckProxy", "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol$Builder proxyProtocol", - "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder secureRedirect", "public com.yahoo.jdisc.http.ConnectorConfig$Http2$Builder http2", "public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder serverName" ] @@ -1193,37 +1190,6 @@ ], "fields": [] }, - "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder": { - "superClass": "java.lang.Object", - "interfaces": [ - "com.yahoo.config.ConfigBuilder" - ], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>()", - "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect)", - "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder enabled(boolean)", - "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder port(int)", - "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect build()" - ], - "fields": [] - }, - "com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect": { - "superClass": "com.yahoo.config.InnerNode", - "interfaces": [], - "attributes": [ - "public", - "final" - ], - "methods": [ - "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect$Builder)", - "public boolean enabled()", - "public int port()" - ], - "fields": [] - }, "com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder": { "superClass": "java.lang.Object", "interfaces": [ @@ -1236,9 +1202,13 @@ "public void <init>()", "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$ServerName)", "public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder fallback(java.lang.String)", + "public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder allowed(java.lang.String)", + "public com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder allowed(java.util.Collection)", "public com.yahoo.jdisc.http.ConnectorConfig$ServerName build()" ], - "fields": [] + "fields": [ + "public java.util.List allowed" + ] }, "com.yahoo.jdisc.http.ConnectorConfig$ServerName": { "superClass": "com.yahoo.config.InnerNode", @@ -1249,7 +1219,9 @@ ], "methods": [ "public void <init>(com.yahoo.jdisc.http.ConnectorConfig$ServerName$Builder)", - "public java.lang.String fallback()" + "public java.lang.String fallback()", + "public java.util.List allowed()", + "public java.lang.String allowed(int)" ], "fields": [] }, @@ -1443,7 +1415,6 @@ "public com.yahoo.jdisc.http.ConnectorConfig$TlsClientAuthEnforcer tlsClientAuthEnforcer()", "public com.yahoo.jdisc.http.ConnectorConfig$HealthCheckProxy healthCheckProxy()", "public com.yahoo.jdisc.http.ConnectorConfig$ProxyProtocol proxyProtocol()", - "public com.yahoo.jdisc.http.ConnectorConfig$SecureRedirect secureRedirect()", "public int maxRequestsPerConnection()", "public double maxConnectionLife()", "public boolean http2Enabled()", diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java index bf278981b69..6282e334409 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java @@ -9,7 +9,6 @@ import com.yahoo.jdisc.http.ssl.impl.DefaultConnectorSsl; import com.yahoo.security.tls.MixedMode; import com.yahoo.security.tls.TransportSecurityUtils; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; -import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; @@ -57,7 +56,6 @@ public class ConnectorFactory { // e.g. due to TLS configuration through environment variables. private static void runtimeConnectorConfigValidation(ConnectorConfig config) { validateProxyProtocolConfiguration(config); - validateSecureRedirectConfig(config); } private static void validateProxyProtocolConfiguration(ConnectorConfig config) { @@ -70,28 +68,15 @@ public class ConnectorFactory { } } - private static void validateSecureRedirectConfig(ConnectorConfig config) { - if (config.secureRedirect().enabled() && isSslEffectivelyEnabled(config)) { - throw new IllegalArgumentException("Secure redirect can only be enabled on connectors without HTTPS"); - } - } - public ConnectorConfig getConnectorConfig() { return connectorConfig; } public ServerConnector createConnector(final Metric metric, final Server server, JettyConnectionLogger connectionLogger, ConnectionMetricAggregator connectionMetricAggregator) { - ServerConnector connector = new JDiscServerConnector( + return new JDiscServerConnector( connectorConfig, metric, server, connectionLogger, connectionMetricAggregator, createConnectionFactories(metric).toArray(ConnectionFactory[]::new)); - connector.setPort(connectorConfig.listenPort()); - connector.setName(connectorConfig.name()); - connector.setAcceptQueueSize(connectorConfig.acceptQueueSize()); - connector.setReuseAddress(connectorConfig.reuseAddress()); - connector.setIdleTimeout(toMillis(connectorConfig.idleTimeout())); - connector.addBean(HttpCompliance.RFC7230); - return connector; } private List<ConnectionFactory> createConnectionFactories(Metric metric) { diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java index 22c5b2ebfdb..3fb81cb5352 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollector.java @@ -3,6 +3,7 @@ package com.yahoo.jdisc.http.server.jetty; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.http.HttpRequest; +import com.yahoo.jdisc.http.ServerConfig; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.server.AsyncContextEvent; @@ -22,7 +23,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -52,13 +52,20 @@ class HttpResponseStatisticsCollector extends HandlerWrapper implements Graceful private final AtomicReference<FutureCallback> shutdown = new AtomicReference<>(); private final List<String> monitoringHandlerPaths; private final List<String> searchHandlerPaths; + private final Set<String> ignoredUserAgents; private final AtomicLong inFlight = new AtomicLong(); private final ConcurrentMap<StatusCodeMetric, LongAdder> statistics = new ConcurrentHashMap<>(); - HttpResponseStatisticsCollector(List<String> monitoringHandlerPaths, List<String> searchHandlerPaths) { + HttpResponseStatisticsCollector(ServerConfig.Metric cfg) { + this(cfg.monitoringHandlerPaths(), cfg.searchHandlerPaths(), cfg.ignoredUserAgents()); + } + + HttpResponseStatisticsCollector(List<String> monitoringHandlerPaths, List<String> searchHandlerPaths, + Collection<String> ignoredUserAgents) { this.monitoringHandlerPaths = monitoringHandlerPaths; this.searchHandlerPaths = searchHandlerPaths; + this.ignoredUserAgents = Set.copyOf(ignoredUserAgents); } private final AsyncListener completionWatcher = new AsyncListener() { @@ -108,12 +115,6 @@ class HttpResponseStatisticsCollector extends HandlerWrapper implements Graceful } } - void ignoreUserAgent(String agentName) { - ignoredUserAgents.add(agentName); - } - - private Set<String> ignoredUserAgents = new HashSet<>(); - private boolean shouldLogMetricsFor(Request request) { String agent = request.getHeader(HttpHeader.USER_AGENT.toString()); if (agent == null) return true; diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java index 79cdb8f67cf..49db22c3e38 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java @@ -3,6 +3,7 @@ package com.yahoo.jdisc.http.server.jetty; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Server; @@ -50,6 +51,12 @@ class JDiscServerConnector extends ServerConnector { } addBean(connectionLogger); addBean(connectionMetricAggregator); + setPort(config.listenPort()); + setName(config.name()); + setAcceptQueueSize(config.acceptQueueSize()); + setReuseAddress(config.reuseAddress()); + setIdleTimeout((long) (config.idleTimeout() * 1000)); + addBean(HttpCompliance.RFC7230); } @Override diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java index 96c5bac335b..965575f8b30 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java @@ -14,11 +14,13 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.jmx.ConnectorServer; import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.server.handler.StatisticsHandler; import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.server.handler.gzip.GzipHttpOutputInterceptor; @@ -83,20 +85,13 @@ public class JettyHttpServer extends AbstractServerProvider { listenedPorts.add(connectorConfig.listenPort()); } - JDiscContext jDiscContext = new JDiscContext(filterBindings, - container, - janitor, - metric, - serverConfig); + JDiscContext jDiscContext = new JDiscContext(filterBindings, container, janitor, metric, serverConfig); ServletHolder jdiscServlet = new ServletHolder(new JDiscHttpServlet(jDiscContext)); List<JDiscServerConnector> connectors = Arrays.stream(server.getConnectors()) .map(JDiscServerConnector.class::cast) .collect(toList()); - - server.setHandler(getHandlerCollection(serverConfig, - connectors, - jdiscServlet)); + server.setHandler(createRootHandler(serverConfig, connectors, jdiscServlet)); this.metricsReporter = new ServerMetricReporter(metric, server); } @@ -136,46 +131,36 @@ public class JettyHttpServer extends AbstractServerProvider { } } - private HandlerCollection getHandlerCollection(ServerConfig serverConfig, - List<JDiscServerConnector> connectors, - ServletHolder jdiscServlet) { - ServletContextHandler servletContextHandler = createServletContextHandler(); - servletContextHandler.addServlet(jdiscServlet, "/*"); - - List<ConnectorConfig> connectorConfigs = connectors.stream().map(JDiscServerConnector::connectorConfig).collect(toList()); - var secureRedirectHandler = new SecuredRedirectHandler(connectorConfigs); - secureRedirectHandler.setHandler(servletContextHandler); - - var proxyHandler = new HealthCheckProxyHandler(connectors); - proxyHandler.setHandler(secureRedirectHandler); - - var authEnforcer = new TlsClientAuthenticationEnforcer(connectorConfigs); - authEnforcer.setHandler(proxyHandler); - - GzipHandler gzipHandler = newGzipHandler(serverConfig); - gzipHandler.setHandler(authEnforcer); - - HttpResponseStatisticsCollector statisticsCollector = - new HttpResponseStatisticsCollector(serverConfig.metric().monitoringHandlerPaths(), - serverConfig.metric().searchHandlerPaths()); - statisticsCollector.setHandler(gzipHandler); - for (String agent : serverConfig.metric().ignoredUserAgents()) { - statisticsCollector.ignoreUserAgent(agent); + private HandlerCollection createRootHandler( + ServerConfig serverCfg, List<JDiscServerConnector> connectors, ServletHolder jdiscServlet) { + List<ContextHandler> perConnectorHandlers = new ArrayList<>(); + for (JDiscServerConnector connector : connectors) { + ConnectorConfig connectorCfg = connector.connectorConfig(); + List<HandlerWrapper> chain = new ArrayList<>(); + chain.add(newGenericStatisticsHandler()); + chain.add(newResponseStatisticsHandler(serverCfg)); + chain.add(newGzipHandler(serverCfg)); + if (connectorCfg.tlsClientAuthEnforcer().enable()) { + chain.add(newTlsClientAuthEnforcerHandler(connectorCfg)); + } + if (connectorCfg.healthCheckProxy().enable()) { + chain.add(newHealthCheckProxyHandler(connectors)); + } else { + chain.add(newServletHandler(jdiscServlet)); + } + ContextHandler connectorRoot = newConnectorContextHandler(connector, connectorCfg); + addChainToRoot(connectorRoot, chain); + perConnectorHandlers.add(connectorRoot); } - - StatisticsHandler statisticsHandler = newStatisticsHandler(); - statisticsHandler.setHandler(statisticsCollector); - - HandlerCollection handlerCollection = new HandlerCollection(); - handlerCollection.setHandlers(new Handler[] { statisticsHandler }); - return handlerCollection; + return new ContextHandlerCollection(perConnectorHandlers.toArray(new ContextHandler[0])); } - private ServletContextHandler createServletContextHandler() { - ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); - servletContextHandler.setContextPath("/"); - servletContextHandler.setDisplayName(getDisplayName(listenedPorts)); - return servletContextHandler; + private static void addChainToRoot(ContextHandler root, List<HandlerWrapper> chain) { + HandlerWrapper parent = root; + for (HandlerWrapper h : chain) { + parent.setHandler(h); + parent = h; + } } private static String getDisplayName(List<Integer> ports) { @@ -237,13 +222,44 @@ public class JettyHttpServer extends AbstractServerProvider { Server server() { return server; } - private StatisticsHandler newStatisticsHandler() { + private ServletContextHandler newServletHandler(ServletHolder servlet) { + var h = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); + h.setContextPath("/"); + h.setDisplayName(getDisplayName(listenedPorts)); + h.addServlet(servlet, "/*"); + return h; + } + + private static ContextHandler newConnectorContextHandler(JDiscServerConnector connector, ConnectorConfig connectorCfg) { + ContextHandler ctxHandler = new ContextHandler(); + List<String> allowedServerNames = connectorCfg.serverName().allowed(); + if (allowedServerNames.isEmpty()) { + ctxHandler.setVirtualHosts(new String[]{"@%s".formatted(connector.getName())}); + } else { + ctxHandler.setVirtualHosts(allowedServerNames.toArray(new String[0])); + } + return ctxHandler; + } + + private static HealthCheckProxyHandler newHealthCheckProxyHandler(List<JDiscServerConnector> connectors) { + return new HealthCheckProxyHandler(connectors); + } + + private static TlsClientAuthenticationEnforcer newTlsClientAuthEnforcerHandler(ConnectorConfig cfg) { + return new TlsClientAuthenticationEnforcer(cfg.tlsClientAuthEnforcer()); + } + + private static HttpResponseStatisticsCollector newResponseStatisticsHandler(ServerConfig cfg) { + return new HttpResponseStatisticsCollector(cfg.metric()); + } + + private static StatisticsHandler newGenericStatisticsHandler() { StatisticsHandler statisticsHandler = new StatisticsHandler(); statisticsHandler.statsReset(); return statisticsHandler; } - private GzipHandler newGzipHandler(ServerConfig serverConfig) { + private static GzipHandler newGzipHandler(ServerConfig serverConfig) { GzipHandler gzipHandler = new GzipHandlerWithVaryHeaderFixed(); gzipHandler.setCompressionLevel(serverConfig.responseCompressionLevel()); gzipHandler.setInflateBufferSize(8 * 1024); diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java deleted file mode 100644 index e5dddf285ef..00000000000 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/SecuredRedirectHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.jdisc.http.server.jetty; - -import com.yahoo.jdisc.http.ConnectorConfig; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.HandlerWrapper; -import org.eclipse.jetty.util.URIUtil; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.yahoo.jdisc.http.server.jetty.RequestUtils.getConnectorLocalPort; - -/** - * A secure redirect handler inspired by {@link org.eclipse.jetty.server.handler.SecuredRedirectHandler}. - * - * @author bjorncs - */ -class SecuredRedirectHandler extends HandlerWrapper { - - private static final String HEALTH_CHECK_PATH = "/status.html"; - - private final Map<Integer, Integer> redirectMap; - - SecuredRedirectHandler(List<ConnectorConfig> connectorConfigs) { - this.redirectMap = createRedirectMap(connectorConfigs); - } - - @Override - public void handle(String target, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { - int localPort = getConnectorLocalPort(request); - if (!redirectMap.containsKey(localPort)) { - _handler.handle(target, request, servletRequest, servletResponse); - return; - } - servletResponse.setContentLength(0); - if (!servletRequest.getRequestURI().equals(HEALTH_CHECK_PATH)) { - servletResponse.sendRedirect( - URIUtil.newURI("https", request.getServerName(), redirectMap.get(localPort), request.getRequestURI(), request.getQueryString())); - } - request.setHandled(true); - } - - private static Map<Integer, Integer> createRedirectMap(List<ConnectorConfig> connectorConfigs) { - var redirectMap = new HashMap<Integer, Integer>(); - for (ConnectorConfig connectorConfig : connectorConfigs) { - if (connectorConfig.secureRedirect().enabled()) { - redirectMap.put(connectorConfig.listenPort(), connectorConfig.secureRedirect().port()); - } - } - return redirectMap; - } -} diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java index ce949074bfa..b420aabc598 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/TlsClientAuthenticationEnforcer.java @@ -11,11 +11,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.yahoo.jdisc.http.server.jetty.RequestUtils.getConnectorLocalPort; /** * A Jetty handler that enforces TLS client authentication with configurable white list. @@ -24,10 +19,11 @@ import static com.yahoo.jdisc.http.server.jetty.RequestUtils.getConnectorLocalPo */ class TlsClientAuthenticationEnforcer extends HandlerWrapper { - private final Map<Integer, List<String>> portToWhitelistedPathsMapping; + private final ConnectorConfig.TlsClientAuthEnforcer cfg; - TlsClientAuthenticationEnforcer(List<ConnectorConfig> connectorConfigs) { - portToWhitelistedPathsMapping = createWhitelistMapping(connectorConfigs); + TlsClientAuthenticationEnforcer(ConnectorConfig.TlsClientAuthEnforcer cfg) { + if (!cfg.enable()) throw new IllegalArgumentException(); + this.cfg = cfg; } @Override @@ -44,36 +40,11 @@ class TlsClientAuthenticationEnforcer extends HandlerWrapper { } } - private static Map<Integer, List<String>> createWhitelistMapping(List<ConnectorConfig> connectorConfigs) { - var mapping = new HashMap<Integer, List<String>>(); - for (ConnectorConfig connectorConfig : connectorConfigs) { - var enforcerConfig = connectorConfig.tlsClientAuthEnforcer(); - if (enforcerConfig.enable()) { - mapping.put(connectorConfig.listenPort(), enforcerConfig.pathWhitelist()); - } - } - return mapping; - } - - private boolean isRequest(Request request) { - return request.getDispatcherType() == DispatcherType.REQUEST; - } + private boolean isRequest(Request request) { return request.getDispatcherType() == DispatcherType.REQUEST; } private boolean isRequestToWhitelistedBinding(Request jettyRequest) { - int localPort = getConnectorLocalPort(jettyRequest); - List<String> whiteListedPaths = getWhitelistedPathsForPort(localPort); - if (whiteListedPaths == null) { - return true; // enforcer not enabled - } // Note: Same path definition as HttpRequestFactory.getUri() - return whiteListedPaths.contains(jettyRequest.getRequestURI()); - } - - private List<String> getWhitelistedPathsForPort(int localPort) { - if (portToWhitelistedPathsMapping.containsKey(0) && portToWhitelistedPathsMapping.size() == 1) { - return portToWhitelistedPathsMapping.get(0); // for unit tests which uses 0 for listen port - } - return portToWhitelistedPathsMapping.get(localPort); + return cfg.pathWhitelist().contains(jettyRequest.getRequestURI()); } private boolean isClientAuthenticated(HttpServletRequest servletRequest) { diff --git a/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def b/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def index 1f4763d32a7..ecbc451ead1 100644 --- a/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def +++ b/container-core/src/main/resources/configdefinitions/jdisc.http.jdisc.http.connector.def @@ -116,12 +116,6 @@ proxyProtocol.enabled bool default=false # Allow https in parallel with proxy protocol proxyProtocol.mixedMode bool default=false -# Redirect all requests to https port -secureRedirect.enabled bool default=false - -# Target port for redirect -secureRedirect.port int default=443 - # Maximum number of request per connection before server marks connections as non-persistent. Set to '0' to disable. maxRequestsPerConnection int default=0 @@ -138,3 +132,5 @@ http2.maxConcurrentStreams int default=4096 # Override the default server name when authority is missing from request. serverName.fallback string default="" +# The list of accepted server names. Empty list to accept any. Elements follows format of 'serverName.default'. +serverName.allowed[] string diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java index 1f65bc4f582..165659389ec 100644 --- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java +++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpResponseStatisticsCollectorTest.java @@ -25,6 +25,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Set; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -38,7 +39,7 @@ public class HttpResponseStatisticsCollectorTest { private Connector connector; private List<String> monitoringPaths = List.of("/status.html"); private List<String> searchPaths = List.of("/search"); - private HttpResponseStatisticsCollector collector = new HttpResponseStatisticsCollector(monitoringPaths, searchPaths); + private HttpResponseStatisticsCollector collector = new HttpResponseStatisticsCollector(monitoringPaths, searchPaths, Set.of()); private int httpResponseCode = 500; @Test diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java index 2c5d36bd776..318067ac634 100644 --- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java +++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerTest.java @@ -743,7 +743,7 @@ public class HttpServerTest { } @Test - void requestThatFallbackServerNameCanBeOverridden() throws Exception { + void fallbackServerNameCanBeOverridden() throws Exception { String fallbackHostname = "myhostname"; JettyTestDriver driver = JettyTestDriver.newConfiguredInstance( new UriRequestHandler(), @@ -752,13 +752,29 @@ public class HttpServerTest { .serverName(new ConnectorConfig.ServerName.Builder().fallback(fallbackHostname))); int listenPort = driver.server().getListenPort(); HttpGet req = new HttpGet("http://localhost:" + listenPort + "/"); - req.addHeader("Host", null); + req.setHeader("Host", null); driver.client().execute(req) .expectStatusCode(is(OK)) .expectContent(containsString("http://" + fallbackHostname + ":" + listenPort + "/")); assertTrue(driver.close()); } + @Test + void acceptedServerNamesCanBeRestricted() throws Exception { + String requiredServerName = "myhostname"; + JettyTestDriver driver = JettyTestDriver.newConfiguredInstance( + new EchoRequestHandler(), + new ServerConfig.Builder(), + new ConnectorConfig.Builder() + .serverName(new ConnectorConfig.ServerName.Builder().allowed(requiredServerName))); + int listenPort = driver.server().getListenPort(); + HttpGet req = new HttpGet("http://localhost:" + listenPort + "/"); + req.setHeader("Host", requiredServerName); + driver.client().execute(req).expectStatusCode(is(OK)); + driver.client().get("/").expectStatusCode(is(NOT_FOUND)); + assertTrue(driver.close()); + } + private static JettyTestDriver createSslWithTlsClientAuthenticationEnforcer(Path certificateFile, Path privateKeyFile) { ConnectorConfig.Builder connectorConfig = new ConnectorConfig.Builder() .tlsClientAuthEnforcer( diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java index 0d4b2c658cf..36036d6d36d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -767,8 +768,16 @@ public class JobController { .filter(versions::contains) // Don't deploy versions that are no longer known. .ifPresent(versions::add); - if (versions.isEmpty()) - throw new IllegalStateException("no deployable platform version found in the system"); + // Remove all versions that are older than the compile version. + versions.removeIf(version -> applicationPackage.compileVersion().map(version::isBefore).orElse(false)); + if (versions.isEmpty()) { + // Fall back to the newest deployable version, if all the ones with normal confidence were too old. + Iterator<VespaVersion> descending = reversed(versionStatus.deployableVersions()).iterator(); + if ( ! descending.hasNext()) + throw new IllegalStateException("no deployable platform version found in the system"); + else + versions.add(descending.next().versionNumber()); + } VersionCompatibility compatibility = controller.applications().versionCompatibility(id.applicationId()); List<Version> compatibleVersions = new ArrayList<>(); diff --git a/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java b/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java index eb171983cf1..a395973a55a 100644 --- a/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java +++ b/document/src/main/java/com/yahoo/document/json/ParsedDocumentOperation.java @@ -3,14 +3,49 @@ package com.yahoo.document.json; import com.yahoo.document.DocumentOperation; +import java.util.Objects; + /** * The result of JSON parsing a single document operation - * - * @param operation - * the parsed operation - * @param fullyApplied - * true if all the JSON content could be applied, - * false if some (or all) of the fields were not poresent in this document and was ignored */ -public record ParsedDocumentOperation(DocumentOperation operation, boolean fullyApplied) { +public final class ParsedDocumentOperation { + + private final DocumentOperation operation; + private final boolean fullyApplied; + + /** + * @param operation the parsed operation + * @param fullyApplied true if all the JSON content could be applied, + * false if some (or all) of the fields were not poresent in this document and was ignored + */ + public ParsedDocumentOperation(DocumentOperation operation, boolean fullyApplied) { + this.operation = operation; + this.fullyApplied = fullyApplied; + } + + public DocumentOperation operation() { return operation; } + + public boolean fullyApplied() { return fullyApplied; } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (ParsedDocumentOperation) obj; + return Objects.equals(this.operation, that.operation) && + this.fullyApplied == that.fullyApplied; + } + + @Override + public int hashCode() { + return Objects.hash(operation, fullyApplied); + } + + @Override + public String toString() { + return "ParsedDocumentOperation[" + + "operation=" + operation + ", " + + "fullyApplied=" + fullyApplied + ']'; + } + } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 5766b059d98..feac5d668e7 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -79,7 +79,7 @@ public class Flags { public static final UnboundBooleanFlag KEEP_STORAGE_NODE_UP = defineFeatureFlag( "keep-storage-node-up", true, - List.of("hakonhall"), "2022-07-07", "2022-10-07", + List.of("hakonhall"), "2022-07-07", "2022-11-07", "Whether to leave the storage node (with wanted state) UP while the node is permanently down.", "Takes effect immediately for nodes transitioning to permanently down.", ZONE_ID, APPLICATION_ID); diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt index a7d831aa623..62aca6d68cc 100644 --- a/searchlib/CMakeLists.txt +++ b/searchlib/CMakeLists.txt @@ -225,6 +225,7 @@ vespa_define_module( src/tests/tensor/hnsw_saver src/tests/tensor/tensor_buffer_operations src/tests/tensor/tensor_buffer_store + src/tests/tensor/tensor_buffer_type_mapper src/tests/transactionlog src/tests/transactionlogstress src/tests/true diff --git a/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp b/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp index 1574e7d38f1..8b21952b2d1 100644 --- a/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp +++ b/searchlib/src/tests/tensor/direct_tensor_store/direct_tensor_store_test.cpp @@ -62,7 +62,7 @@ public: } void expect_tensor(const Value* exp, EntryRef ref) { - const auto* act = store.get_tensor(ref); + const auto* act = store.get_tensor_ptr(ref); ASSERT_TRUE(act); EXPECT_EQ(exp, act); } @@ -81,7 +81,7 @@ TEST_F(DirectTensorStoreTest, heap_allocated_memory_is_tracked) store.store_tensor(make_tensor(5)); auto mem_1 = store.getMemoryUsage(); auto ref = store.store_tensor(make_tensor(10)); - auto tensor_mem_usage = store.get_tensor(ref)->get_memory_usage(); + auto tensor_mem_usage = store.get_tensor_ptr(ref)->get_memory_usage(); auto mem_2 = store.getMemoryUsage(); EXPECT_GT(tensor_mem_usage.usedBytes(), 500); EXPECT_LT(tensor_mem_usage.usedBytes(), 50000); @@ -93,14 +93,14 @@ TEST_F(DirectTensorStoreTest, heap_allocated_memory_is_tracked) TEST_F(DirectTensorStoreTest, invalid_ref_returns_nullptr) { - const auto* t = store.get_tensor(EntryRef()); + const auto* t = store.get_tensor_ptr(EntryRef()); EXPECT_FALSE(t); } TEST_F(DirectTensorStoreTest, hold_adds_entry_to_hold_list) { auto ref = store.store_tensor(make_tensor(5)); - auto tensor_mem_usage = store.get_tensor(ref)->get_memory_usage(); + auto tensor_mem_usage = store.get_tensor_ptr(ref)->get_memory_usage(); auto mem_1 = store.getMemoryUsage(); store.holdTensor(ref); auto mem_2 = store.getMemoryUsage(); diff --git a/searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt new file mode 100644 index 00000000000..e219b17ebd1 --- /dev/null +++ b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchlib_tensor_buffer_type_mapper_test_app TEST + SOURCES + tensor_buffer_type_mapper_test.cpp + DEPENDS + searchlib + GTest::GTest +) +vespa_add_test(NAME searchlib_tensor_buffer_type_mapper_test_app COMMAND searchlib_tensor_buffer_type_mapper_test_app) diff --git a/searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp new file mode 100644 index 00000000000..8e88c103516 --- /dev/null +++ b/searchlib/src/tests/tensor/tensor_buffer_type_mapper/tensor_buffer_type_mapper_test.cpp @@ -0,0 +1,121 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchlib/tensor/tensor_buffer_type_mapper.h> +#include <vespa/searchlib/tensor/tensor_buffer_operations.h> +#include <vespa/eval/eval/value_type.h> +#include <vespa/vespalib/gtest/gtest.h> + +using search::tensor::TensorBufferOperations; +using search::tensor::TensorBufferTypeMapper; +using vespalib::eval::ValueType; + +const vespalib::string tensor_type_sparse_spec("tensor(x{})"); +const vespalib::string tensor_type_2d_spec("tensor(x{},y{})"); +const vespalib::string tensor_type_2d_mixed_spec("tensor(x{},y[2])"); +const vespalib::string float_tensor_type_spec("tensor<float>(y{})"); +const vespalib::string tensor_type_dense_spec("tensor(x[2])"); + +struct TestParam +{ + vespalib::string _name; + std::vector<size_t> _array_sizes; + vespalib::string _tensor_type_spec; + TestParam(vespalib::string name, std::vector<size_t> array_sizes, const vespalib::string& tensor_type_spec) + : _name(std::move(name)), + _array_sizes(std::move(array_sizes)), + _tensor_type_spec(tensor_type_spec) + { + } + TestParam(const TestParam&); + ~TestParam(); +}; + +TestParam::TestParam(const TestParam&) = default; + +TestParam::~TestParam() = default; + +std::ostream& operator<<(std::ostream& os, const TestParam& param) +{ + os << param._name; + return os; +} + +class TensorBufferTypeMapperTest : public testing::TestWithParam<TestParam> +{ +protected: + ValueType _tensor_type; + TensorBufferOperations _ops; + TensorBufferTypeMapper _mapper; + TensorBufferTypeMapperTest(); + ~TensorBufferTypeMapperTest() override; + std::vector<size_t> get_array_sizes(); + void select_type_ids(); +}; + +TensorBufferTypeMapperTest::TensorBufferTypeMapperTest() + : testing::TestWithParam<TestParam>(), + _tensor_type(ValueType::from_spec(GetParam()._tensor_type_spec)), + _ops(_tensor_type), + _mapper(GetParam()._array_sizes.size(), &_ops) +{ +} + +TensorBufferTypeMapperTest::~TensorBufferTypeMapperTest() = default; + +std::vector<size_t> +TensorBufferTypeMapperTest::get_array_sizes() +{ + uint32_t max_small_subspaces_type_id = GetParam()._array_sizes.size(); + std::vector<size_t> array_sizes; + for (uint32_t type_id = 1; type_id <= max_small_subspaces_type_id; ++type_id) { + auto num_subspaces = type_id - 1; + array_sizes.emplace_back(_mapper.get_array_size(type_id)); + EXPECT_EQ(_ops.get_array_size(num_subspaces), array_sizes.back()); + } + return array_sizes; +} + +void +TensorBufferTypeMapperTest::select_type_ids() +{ + auto& array_sizes = GetParam()._array_sizes; + uint32_t type_id = 0; + for (auto array_size : array_sizes) { + ++type_id; + EXPECT_EQ(type_id, _mapper.get_type_id(array_size)); + EXPECT_EQ(type_id, _mapper.get_type_id(array_size - 1)); + if (array_size == array_sizes.back()) { + // Fallback to indirect storage, using type id 0 + EXPECT_EQ(0u, _mapper.get_type_id(array_size + 1)); + } else { + EXPECT_EQ(type_id + 1, _mapper.get_type_id(array_size + 1)); + } + } +} + +/* + * For "dense" case, array size for type id 1 is irrelevant, since + * type ids 0 and 1 are not used when storing dense tensors in + * TensorBufferStore. + */ + +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(TensorBufferTypeMapperMultiTest, + TensorBufferTypeMapperTest, + testing::Values(TestParam("1d", {8, 16, 32, 40, 64}, tensor_type_sparse_spec), + TestParam("1dfloat", {4, 12, 20, 28, 36}, float_tensor_type_spec), + TestParam("2d", {8, 24, 40, 56, 80}, tensor_type_2d_spec), + TestParam("2dmixed", {8, 24, 48, 64, 96}, tensor_type_2d_mixed_spec), + TestParam("dense", {8, 24}, tensor_type_dense_spec)), + testing::PrintToStringParamName()); + +TEST_P(TensorBufferTypeMapperTest, array_sizes_are_calculated) +{ + EXPECT_EQ(GetParam()._array_sizes, get_array_sizes()); +} + +TEST_P(TensorBufferTypeMapperTest, type_ids_are_selected) +{ + select_type_ids(); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp b/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp index eb5f0ca843b..72f4a7ae579 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/feature_store.cpp @@ -68,8 +68,6 @@ FeatureStore::moveFeatures(EntryRef ref, uint64_t bitLen) const uint8_t *src = getBits(ref); uint64_t byteLen = (bitLen + 7) / 8; EntryRef newRef = addFeatures(src, byteLen); - // Mark old features as dead - _store.incDead(ref, byteLen + Aligner::pad(byteLen)); return newRef; } @@ -117,7 +115,6 @@ FeatureStore::add_features_guard_bytes() uint32_t pad = Aligner::pad(len); auto result = _store.rawAllocator<uint8_t>(_typeId).alloc(len + pad); memset(result.data, 0, len + pad); - _store.incDead(result.ref, len + pad); } void diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt index 46bfc0909aa..75f453ddcbc 100644 --- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt @@ -30,7 +30,6 @@ vespa_add_library(searchlib_tensor OBJECT serialized_fast_value_attribute.cpp small_subspaces_buffer_type.cpp streamed_value_saver.cpp - streamed_value_store.cpp tensor_attribute.cpp tensor_buffer_operations.cpp tensor_buffer_store.cpp diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp index e2271e63425..7730f340e01 100644 --- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp @@ -84,7 +84,7 @@ DirectTensorAttribute::update_tensor(DocId docId, ref = _refVector[docId].load_relaxed(); } if (ref.valid()) { - auto ptr = _direct_store.get_tensor(ref); + auto ptr = _direct_store.get_tensor_ptr(ref); if (ptr) { auto new_value = update.apply_to(*ptr, FastValueBuilderFactory::get()); if (new_value) { @@ -109,7 +109,7 @@ DirectTensorAttribute::getTensor(DocId docId) const ref = acquire_entry_ref(docId); } if (ref.valid()) { - auto ptr = _direct_store.get_tensor(ref); + auto ptr = _direct_store.get_tensor_ptr(ref); if (ptr) { return FastValueBuilderFactory::get().copy(*ptr); } @@ -123,7 +123,7 @@ DirectTensorAttribute::get_tensor_ref(DocId docId) const { if (docId >= getCommittedDocIdLimit()) { return *_emptyTensor; } - auto ptr = _direct_store.get_tensor(acquire_entry_ref(docId)); + auto ptr = _direct_store.get_tensor_ptr(acquire_entry_ref(docId)); if ( ptr == nullptr) { return *_emptyTensor; } return *ptr; diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp b/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp index 024b2fe5467..0de4491cfcc 100644 --- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp +++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_saver.cpp @@ -35,7 +35,7 @@ DirectTensorAttributeSaver::onSave(IAttributeSaveTarget &saveTarget) const uint32_t docIdLimit(_refs.size()); vespalib::nbostream stream; for (uint32_t lid = 0; lid < docIdLimit; ++lid) { - const vespalib::eval::Value *tensor = _tensorStore.get_tensor(_refs[lid]); + const vespalib::eval::Value *tensor = _tensorStore.get_tensor_ptr(_refs[lid]); if (tensor) { stream.clear(); encode_value(*tensor, stream); diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h b/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h index 57e7453ff99..658d1ab0549 100644 --- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h +++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_store.h @@ -40,7 +40,7 @@ public: ~DirectTensorStore() override; using RefType = TensorStoreType::RefType; - const vespalib::eval::Value * get_tensor(EntryRef ref) const { + const vespalib::eval::Value * get_tensor_ptr(EntryRef ref) const { if (!ref.valid()) { return nullptr; } diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp deleted file mode 100644 index e8752a3145a..00000000000 --- a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp +++ /dev/null @@ -1,288 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "streamed_value_store.h" -#include <vespa/eval/eval/value.h> -#include <vespa/eval/eval/value_codec.h> -#include <vespa/eval/eval/fast_value.hpp> -#include <vespa/eval/streamed/streamed_value_builder_factory.h> -#include <vespa/eval/streamed/streamed_value_view.h> -#include <vespa/vespalib/datastore/buffer_type.hpp> -#include <vespa/vespalib/datastore/compacting_buffers.h> -#include <vespa/vespalib/datastore/compaction_context.h> -#include <vespa/vespalib/datastore/compaction_strategy.h> -#include <vespa/vespalib/datastore/datastore.hpp> -#include <vespa/vespalib/objects/nbostream.h> -#include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/typify.h> -#include <vespa/log/log.h> - -LOG_SETUP(".searchlib.tensor.streamed_value_store"); - -using vespalib::datastore::CompactionContext; -using vespalib::datastore::CompactionSpec; -using vespalib::datastore::CompactionStrategy; -using vespalib::datastore::EntryRef; -using vespalib::datastore::Handle; -using vespalib::datastore::ICompactionContext; -using namespace vespalib::eval; -using vespalib::ConstArrayRef; -using vespalib::MemoryUsage; -using vespalib::string_id; -using vespalib::StringIdVector; - -namespace search::tensor { - -//----------------------------------------------------------------------------- - -namespace { - -template <typename CT, typename F> -void each_subspace(const Value &value, size_t num_mapped, size_t dense_size, F f) { - size_t subspace; - std::vector<string_id> addr(num_mapped); - std::vector<string_id*> refs; - refs.reserve(addr.size()); - for (string_id &label: addr) { - refs.push_back(&label); - } - auto cells = value.cells().typify<CT>(); - auto view = value.index().create_view({}); - view->lookup({}); - while (view->next_result(refs, subspace)) { - size_t offset = subspace * dense_size; - f(ConstArrayRef<string_id>(addr), ConstArrayRef<CT>(cells.begin() + offset, dense_size)); - } -} - -using TensorEntry = StreamedValueStore::TensorEntry; - -struct CreateTensorEntry { - template <typename CT> - static TensorEntry::SP invoke(const Value &value, size_t num_mapped, size_t dense_size) { - using EntryImpl = StreamedValueStore::TensorEntryImpl<CT>; - return std::make_shared<EntryImpl>(value, num_mapped, dense_size); - } -}; - -struct MyFastValueView final : Value { - const ValueType &my_type; - FastValueIndex my_index; - TypedCells my_cells; - MyFastValueView(const ValueType &type_ref, const StringIdVector &handle_view, TypedCells cells, size_t num_mapped, size_t num_spaces) - : my_type(type_ref), - my_index(num_mapped, handle_view, num_spaces), - my_cells(cells) - { - const StringIdVector &labels = handle_view; - for (size_t i = 0; i < num_spaces; ++i) { - ConstArrayRef<string_id> addr(labels.data() + (i * num_mapped), num_mapped); - my_index.map.add_mapping(FastAddrMap::hash_labels(addr)); - } - assert(my_index.map.size() == num_spaces); - } - const ValueType &type() const override { return my_type; } - const Value::Index &index() const override { return my_index; } - TypedCells cells() const override { return my_cells; } - MemoryUsage get_memory_usage() const override { - MemoryUsage usage = self_memory_usage<MyFastValueView>(); - usage.merge(my_index.map.estimate_extra_memory_usage()); - return usage; - } -}; - -} // <unnamed> - -//----------------------------------------------------------------------------- - -StreamedValueStore::TensorEntry::~TensorEntry() = default; - -StreamedValueStore::TensorEntry::SP -StreamedValueStore::TensorEntry::create_shared_entry(const Value &value) -{ - size_t num_mapped = value.type().count_mapped_dimensions(); - size_t dense_size = value.type().dense_subspace_size(); - return vespalib::typify_invoke<1,TypifyCellType,CreateTensorEntry>(value.type().cell_type(), value, num_mapped, dense_size); -} - -template <typename CT> -StreamedValueStore::TensorEntryImpl<CT>::TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size) - : handles(), - cells() -{ - handles.reserve(num_mapped * value.index().size()); - cells.reserve(dense_size * value.index().size()); - auto store_subspace = [&](auto addr, auto data) { - for (string_id label: addr) { - handles.push_back(label); - } - for (CT entry: data) { - cells.push_back(entry); - } - }; - each_subspace<CT>(value, num_mapped, dense_size, store_subspace); -} - -template <typename CT> -Value::UP -StreamedValueStore::TensorEntryImpl<CT>::create_fast_value_view(const ValueType &type_ref) const -{ - size_t num_mapped = type_ref.count_mapped_dimensions(); - size_t dense_size = type_ref.dense_subspace_size(); - size_t num_spaces = cells.size() / dense_size; - assert(dense_size * num_spaces == cells.size()); - assert(num_mapped * num_spaces == handles.view().size()); - return std::make_unique<MyFastValueView>(type_ref, handles.view(), TypedCells(cells), num_mapped, num_spaces); -} - -template <typename CT> -void -StreamedValueStore::TensorEntryImpl<CT>::encode_value(const ValueType &type, vespalib::nbostream &target) const -{ - size_t num_mapped = type.count_mapped_dimensions(); - size_t dense_size = type.dense_subspace_size(); - size_t num_spaces = cells.size() / dense_size; - assert(dense_size * num_spaces == cells.size()); - assert(num_mapped * num_spaces == handles.view().size()); - StreamedValueView my_value(type, num_mapped, TypedCells(cells), num_spaces, handles.view()); - ::vespalib::eval::encode_value(my_value, target); -} - -template <typename CT> -MemoryUsage -StreamedValueStore::TensorEntryImpl<CT>::get_memory_usage() const -{ - MemoryUsage usage = self_memory_usage<TensorEntryImpl<CT>>(); - usage.merge(vector_extra_memory_usage(handles.view())); - usage.merge(vector_extra_memory_usage(cells)); - return usage; -} - -template <typename CT> -StreamedValueStore::TensorEntryImpl<CT>::~TensorEntryImpl() = default; - -//----------------------------------------------------------------------------- - -constexpr size_t MIN_BUFFER_ARRAYS = 8_Ki; - -StreamedValueStore::TensorBufferType::TensorBufferType() noexcept - : ParentType(1, MIN_BUFFER_ARRAYS, TensorStoreType::RefType::offsetSize()) -{ -} - -void -StreamedValueStore::TensorBufferType::cleanHold(void* buffer, size_t offset, ElemCount num_elems, CleanContext clean_ctx) -{ - TensorEntry::SP* elem = static_cast<TensorEntry::SP*>(buffer) + offset; - const auto& empty = empty_entry(); - for (size_t i = 0; i < num_elems; ++i) { - clean_ctx.extraBytesCleaned((*elem)->get_memory_usage().allocatedBytes()); - *elem = empty; - ++elem; - } -} - -StreamedValueStore::StreamedValueStore(const ValueType &tensor_type) - : TensorStore(_concrete_store), - _concrete_store(std::make_unique<TensorBufferType>()), - _tensor_type(tensor_type) -{ - _concrete_store.enableFreeLists(); -} - -StreamedValueStore::~StreamedValueStore() = default; - -EntryRef -StreamedValueStore::add_entry(TensorEntry::SP tensor) -{ - auto ref = _concrete_store.addEntry(tensor); - auto& state = _concrete_store.getBufferState(RefType(ref).bufferId()); - state.stats().inc_extra_used_bytes(tensor->get_memory_usage().allocatedBytes()); - return ref; -} - -const StreamedValueStore::TensorEntry * -StreamedValueStore::get_tensor_entry(EntryRef ref) const -{ - if (!ref.valid()) { - return nullptr; - } - const auto& entry = _concrete_store.getEntry(ref); - assert(entry); - return entry.get(); -} - -std::unique_ptr<vespalib::eval::Value> -StreamedValueStore::get_tensor(EntryRef ref) const -{ - if (const auto * ptr = get_tensor_entry(ref)) { - return ptr->create_fast_value_view(_tensor_type); - } - return {}; -} - -void -StreamedValueStore::holdTensor(EntryRef ref) -{ - if (!ref.valid()) { - return; - } - const auto& tensor = _concrete_store.getEntry(ref); - assert(tensor); - _concrete_store.holdElem(ref, 1, tensor->get_memory_usage().allocatedBytes()); -} - -TensorStore::EntryRef -StreamedValueStore::move(EntryRef ref) -{ - if (!ref.valid()) { - return EntryRef(); - } - const auto& old_tensor = _concrete_store.getEntry(ref); - assert(old_tensor); - auto new_ref = add_entry(old_tensor); - _concrete_store.holdElem(ref, 1, old_tensor->get_memory_usage().allocatedBytes()); - return new_ref; -} - -vespalib::MemoryUsage -StreamedValueStore::update_stat(const CompactionStrategy& compaction_strategy) -{ - auto memory_usage = _store.getMemoryUsage(); - _compaction_spec = CompactionSpec(compaction_strategy.should_compact_memory(memory_usage), false); - return memory_usage; -} - -std::unique_ptr<ICompactionContext> -StreamedValueStore::start_compact(const CompactionStrategy& compaction_strategy) -{ - auto compacting_buffers = _store.start_compact_worst_buffers(_compaction_spec, compaction_strategy); - return std::make_unique<CompactionContext>(*this, std::move(compacting_buffers)); -} - -bool -StreamedValueStore::encode_stored_tensor(EntryRef ref, vespalib::nbostream &target) const -{ - if (const auto * entry = get_tensor_entry(ref)) { - entry->encode_value(_tensor_type, target); - return true; - } else { - return false; - } -} - -TensorStore::EntryRef -StreamedValueStore::store_tensor(const Value &tensor) -{ - assert(tensor.type() == _tensor_type); - return add_entry(TensorEntry::create_shared_entry(tensor)); -} - -TensorStore::EntryRef -StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded) -{ - const auto &factory = StreamedValueBuilderFactory::get(); - auto val = vespalib::eval::decode_value(encoded, factory); - return store_tensor(*val); -} - -} diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h deleted file mode 100644 index 58137e316dd..00000000000 --- a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "tensor_store.h" -#include <vespa/eval/eval/value_type.h> -#include <vespa/eval/eval/value.h> -#include <vespa/eval/streamed/streamed_value.h> -#include <vespa/vespalib/datastore/datastore.h> -#include <vespa/vespalib/objects/nbostream.h> -#include <vespa/vespalib/util/shared_string_repo.h> - -namespace search::tensor { - -/** - * Class for StreamedValue tensors in memory. - */ -class StreamedValueStore : public TensorStore { -public: - using Value = vespalib::eval::Value; - using ValueType = vespalib::eval::ValueType; - using Handles = vespalib::SharedStringRepo::Handles; - using MemoryUsage = vespalib::MemoryUsage; - - // interface for tensor entries - struct TensorEntry { - using SP = std::shared_ptr<TensorEntry>; - virtual Value::UP create_fast_value_view(const ValueType &type_ref) const = 0; - virtual void encode_value(const ValueType &type, vespalib::nbostream &target) const = 0; - virtual MemoryUsage get_memory_usage() const = 0; - virtual ~TensorEntry(); - static TensorEntry::SP create_shared_entry(const Value &value); - }; - - // implementation of tensor entries - template <typename CT> - struct TensorEntryImpl : public TensorEntry { - Handles handles; - std::vector<CT> cells; - TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size); - Value::UP create_fast_value_view(const ValueType &type_ref) const override; - void encode_value(const ValueType &type, vespalib::nbostream &target) const override; - MemoryUsage get_memory_usage() const override; - ~TensorEntryImpl() override; - }; - -private: - // Note: Must use SP (instead of UP) because of fallbackCopy() and initializeReservedElements() in BufferType, - // and implementation of move(). - using TensorStoreType = vespalib::datastore::DataStore<TensorEntry::SP>; - - class TensorBufferType : public vespalib::datastore::BufferType<TensorEntry::SP> { - private: - using ParentType = BufferType<TensorEntry::SP>; - using ParentType::empty_entry; - using CleanContext = typename ParentType::CleanContext; - public: - TensorBufferType() noexcept; - void cleanHold(void* buffer, size_t offset, ElemCount num_elems, CleanContext clean_ctx) override; - }; - TensorStoreType _concrete_store; - const vespalib::eval::ValueType _tensor_type; - EntryRef add_entry(TensorEntry::SP tensor); - const TensorEntry* get_tensor_entry(EntryRef ref) const; -public: - StreamedValueStore(const vespalib::eval::ValueType &tensor_type); - ~StreamedValueStore() override; - - using RefType = TensorStoreType::RefType; - - void holdTensor(EntryRef ref) override; - EntryRef move(EntryRef ref) override; - vespalib::MemoryUsage update_stat(const vespalib::datastore::CompactionStrategy& compaction_strategy) override; - std::unique_ptr<vespalib::datastore::ICompactionContext> start_compact(const vespalib::datastore::CompactionStrategy& compaction_strategy) override; - - std::unique_ptr<vespalib::eval::Value> get_tensor(EntryRef ref) const; - bool encode_stored_tensor(EntryRef ref, vespalib::nbostream &target) const; - - EntryRef store_tensor(const vespalib::eval::Value &tensor); - EntryRef store_encoded_tensor(vespalib::nbostream &encoded); -}; - - -} diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 7aafb7c364e..b84a90465ea 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -54,6 +54,7 @@ vespa_define_module( src/tests/data/smart_buffer src/tests/datastore/array_store src/tests/datastore/array_store_config + src/tests/datastore/buffer_stats src/tests/datastore/buffer_type src/tests/datastore/compact_buffer_candidates src/tests/datastore/datastore diff --git a/vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt b/vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt new file mode 100644 index 00000000000..2463f584133 --- /dev/null +++ b/vespalib/src/tests/datastore/buffer_stats/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_datastore_buffer_stats_test_app TEST + SOURCES + buffer_stats_test.cpp + DEPENDS + vespalib + GTest::GTest +) +vespa_add_test(NAME vespalib_datastore_buffer_stats_test_app COMMAND vespalib_datastore_buffer_stats_test_app) diff --git a/vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp b/vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp new file mode 100644 index 00000000000..09b2590a5f3 --- /dev/null +++ b/vespalib/src/tests/datastore/buffer_stats/buffer_stats_test.cpp @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/datastore/buffer_stats.h> +#include <vespa/vespalib/datastore/memory_stats.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace vespalib::datastore; + +TEST(BufferStatsTest, buffer_stats_to_memory_stats) +{ + InternalBufferStats buf; + buf.set_alloc_elems(17); + buf.pushed_back(7); + buf.set_dead_elems(5); + buf.set_hold_elems(3); + buf.inc_extra_used_bytes(13); + buf.inc_extra_hold_bytes(11); + + MemoryStats mem; + constexpr size_t es = 8; + buf.add_to_mem_stats(es, mem); + + EXPECT_EQ(17, mem._allocElems); + EXPECT_EQ(7, mem._usedElems); + EXPECT_EQ(5, mem._deadElems); + EXPECT_EQ(3, mem._holdElems); + EXPECT_EQ(17 * es + 13, mem._allocBytes); + EXPECT_EQ(7 * es + 13, mem._usedBytes); + EXPECT_EQ(5 * es, mem._deadBytes); + EXPECT_EQ(3 * es + 11, mem._holdBytes); +} + +GTEST_MAIN_RUN_ALL_TESTS() + diff --git a/vespalib/src/tests/datastore/datastore/datastore_test.cpp b/vespalib/src/tests/datastore/datastore/datastore_test.cpp index bf389e5e78e..2c4e68467da 100644 --- a/vespalib/src/tests/datastore/datastore/datastore_test.cpp +++ b/vespalib/src/tests/datastore/datastore/datastore_test.cpp @@ -17,7 +17,6 @@ using vespalib::alloc::MemoryAllocator; class MyStore : public DataStore<int, EntryRefT<3, 2> > { private: using ParentType = DataStore<int, EntryRefT<3, 2> >; - using ParentType::_primary_buffer_ids; public: MyStore() {} explicit MyStore(std::unique_ptr<BufferType<int>> type) @@ -35,9 +34,6 @@ public: void trimElemHoldList(generation_t usedGen) override { ParentType::trimElemHoldList(usedGen); } - void incDead(EntryRef ref, uint64_t dead) { - ParentType::incDead(ref, dead); - } void ensureBufferCapacity(size_t sizeNeeded) { ParentType::ensureBufferCapacity(0, sizeNeeded); } @@ -47,7 +43,7 @@ public: void switch_primary_buffer() { ParentType::switch_primary_buffer(0, 0u); } - size_t primary_buffer_id() const { return _primary_buffer_ids[0]; } + size_t primary_buffer_id() const { return get_primary_buffer_id(0); } BufferState& get_active_buffer_state() { return ParentType::getBufferState(primary_buffer_id()); } @@ -429,11 +425,6 @@ TEST(DataStoreTest, require_that_memory_stats_are_calculated) m._usedElems++; assertMemStats(m, s.getMemStats()); - // inc dead - s.incDead(r, 1); - m._deadElems++; - assertMemStats(m, s.getMemStats()); - // hold buffer s.addEntry(20); s.addEntry(30); @@ -474,7 +465,7 @@ TEST(DataStoreTest, require_that_memory_stats_are_calculated) { // increase extra hold bytes auto prev_stats = s.getMemStats(); - s.get_active_buffer_state().stats().inc_extra_hold_bytes(30); + s.get_active_buffer_state().hold_elems(0, 30); auto curr_stats = s.getMemStats(); EXPECT_EQ(prev_stats._holdBytes + 30, curr_stats._holdBytes); } @@ -487,7 +478,6 @@ TEST(DataStoreTest, require_that_memory_usage_is_calculated) s.addEntry(20); s.addEntry(30); s.addEntry(40); - s.incDead(r, 1); s.holdBuffer(r.bufferId()); s.transferHoldLists(100); vespalib::MemoryUsage m = s.getMemoryUsage(); @@ -698,9 +688,9 @@ void test_free_element_to_held_buffer(bool direct, bool before_hold_buffer) s.holdBuffer(0); // hold last buffer if (!before_hold_buffer) { if (direct) { - ASSERT_DEATH({ s.freeElem(ref, 1); }, "state.isOnHold\\(\\) && was_held"); + ASSERT_DEATH({ s.freeElem(ref, 1); }, "isOnHold\\(\\) && was_held"); } else { - ASSERT_DEATH({ s.holdElem(ref, 1); }, "state.isActive\\(\\)"); + ASSERT_DEATH({ s.holdElem(ref, 1); }, "isActive\\(\\)"); } } s.transferHoldLists(100); diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp b/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp index 0d367cf9835..8d97414626e 100644 --- a/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp +++ b/vespalib/src/vespa/vespalib/datastore/buffer_stats.cpp @@ -16,14 +16,6 @@ BufferStats::BufferStats() } void -BufferStats::dec_hold_elems(size_t value) -{ - ElemCount elems = hold_elems(); - assert(elems >= value); - _hold_elems.store(elems - value, std::memory_order_relaxed); -} - -void BufferStats::add_to_mem_stats(size_t element_size, MemoryStats& stats) const { size_t extra_used = extra_used_bytes(); @@ -37,13 +29,13 @@ BufferStats::add_to_mem_stats(size_t element_size, MemoryStats& stats) const stats._holdBytes += (hold_elems() * element_size) + extra_hold_bytes(); } -MutableBufferStats::MutableBufferStats() +InternalBufferStats::InternalBufferStats() : BufferStats() { } void -MutableBufferStats::clear() +InternalBufferStats::clear() { _alloc_elems.store(0, std::memory_order_relaxed); _used_elems.store(0, std::memory_order_relaxed); @@ -53,5 +45,13 @@ MutableBufferStats::clear() _extra_hold_bytes.store(0, std::memory_order_relaxed); } +void +InternalBufferStats::dec_hold_elems(size_t value) +{ + ElemCount elems = hold_elems(); + assert(elems >= value); + _hold_elems.store(elems - value, std::memory_order_relaxed); +} + } diff --git a/vespalib/src/vespa/vespalib/datastore/buffer_stats.h b/vespalib/src/vespa/vespalib/datastore/buffer_stats.h index 0df74c0a79d..66f8b532c41 100644 --- a/vespalib/src/vespa/vespalib/datastore/buffer_stats.h +++ b/vespalib/src/vespa/vespalib/datastore/buffer_stats.h @@ -47,11 +47,7 @@ public: size_t extra_used_bytes() const { return _extra_used_bytes.load(std::memory_order_relaxed); } size_t extra_hold_bytes() const { return _extra_hold_bytes.load(std::memory_order_relaxed); } - void inc_dead_elems(size_t value) { _dead_elems.store(dead_elems() + value, std::memory_order_relaxed); } - void inc_hold_elems(size_t value) { _hold_elems.store(hold_elems() + value, std::memory_order_relaxed); } - void dec_hold_elems(size_t value); void inc_extra_used_bytes(size_t value) { _extra_used_bytes.store(extra_used_bytes() + value, std::memory_order_relaxed); } - void inc_extra_hold_bytes(size_t value) { _extra_hold_bytes.store(extra_hold_bytes() + value, std::memory_order_relaxed); } void add_to_mem_stats(size_t element_size, MemoryStats& stats) const; }; @@ -59,13 +55,17 @@ public: /** * Provides low-level access to buffer stats for integration in BufferState. */ -class MutableBufferStats : public BufferStats { +class InternalBufferStats : public BufferStats { public: - MutableBufferStats(); + InternalBufferStats(); void clear(); void set_alloc_elems(size_t value) { _alloc_elems.store(value, std::memory_order_relaxed); } void set_dead_elems(size_t value) { _dead_elems.store(value, std::memory_order_relaxed); } void set_hold_elems(size_t value) { _hold_elems.store(value, std::memory_order_relaxed); } + void inc_dead_elems(size_t value) { _dead_elems.store(dead_elems() + value, std::memory_order_relaxed); } + void inc_hold_elems(size_t value) { _hold_elems.store(hold_elems() + value, std::memory_order_relaxed); } + void dec_hold_elems(size_t value); + void inc_extra_hold_bytes(size_t value) { _extra_hold_bytes.store(extra_hold_bytes() + value, std::memory_order_relaxed); } std::atomic<ElemCount>& used_elems_ref() { return _used_elems; } std::atomic<ElemCount>& dead_elems_ref() { return _dead_elems; } std::atomic<size_t>& extra_used_bytes_ref() { return _extra_used_bytes; } diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp index 35455a193d2..e9ff243fb08 100644 --- a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp +++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp @@ -176,6 +176,39 @@ BufferState::disableElemHoldList() _disableElemHoldList = true; } +bool +BufferState::hold_elems(size_t num_elems, size_t extra_bytes) +{ + assert(isActive()); + if (_disableElemHoldList) { + // The elements are directly marked as dead as they are not put on hold. + _stats.inc_dead_elems(num_elems); + return true; + } + _stats.inc_hold_elems(num_elems); + _stats.inc_extra_hold_bytes(extra_bytes); + return false; +} + +void +BufferState::free_elems(EntryRef ref, size_t num_elems, bool was_held, size_t ref_offset) +{ + if (isActive()) { + if (_free_list.enabled() && (num_elems == getArraySize())) { + _free_list.push_entry(ref); + } + } else { + assert(isOnHold() && was_held); + } + _stats.inc_dead_elems(num_elems); + if (was_held) { + _stats.dec_hold_elems(num_elems); + } + getTypeHandler()->cleanHold(_buffer.get(), (ref_offset * _arraySize), num_elems, + BufferTypeBase::CleanContext(_stats.extra_used_bytes_ref(), + _stats.extra_hold_bytes_ref())); +} + void BufferState::fallbackResize(uint32_t bufferId, size_t elementsNeeded, diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.h b/vespalib/src/vespa/vespalib/datastore/bufferstate.h index c35a51b0c99..0acad7d3ab1 100644 --- a/vespalib/src/vespa/vespalib/datastore/bufferstate.h +++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.h @@ -39,7 +39,7 @@ public: }; private: - MutableBufferStats _stats; + InternalBufferStats _stats; BufferFreeList _free_list; std::atomic<BufferTypeBase*> _typeHandler; Alloc _buffer; @@ -83,24 +83,34 @@ public: void onFree(std::atomic<void*>& buffer); /** - * Disable hold of elements, just mark then as dead without cleanup. + * Disable hold of elements, just mark elements as dead without cleanup. * Typically used when tearing down data structure in a controlled manner. */ void disableElemHoldList(); + /** + * Update stats to reflect that the given elements are put on hold. + * Returns true if element hold list is disabled for this buffer. + */ + bool hold_elems(size_t num_elems, size_t extra_bytes); + + /** + * Free the given elements and update stats accordingly. + * + * The given entry ref is put on the free list (if enabled). + * Hold cleaning of elements is executed on the buffer type. + */ + void free_elems(EntryRef ref, size_t num_elems, bool was_held, size_t ref_offset); + BufferStats& stats() { return _stats; } const BufferStats& stats() const { return _stats; } - BufferFreeList& free_list() { return _free_list; } - const BufferFreeList& free_list() const { return _free_list; } + + void enable_free_list(FreeList& type_free_list) { _free_list.enable(type_free_list); } + void disable_free_list() { _free_list.disable(); } size_t size() const { return _stats.size(); } size_t capacity() const { return _stats.capacity(); } size_t remaining() const { return _stats.remaining(); } - void cleanHold(void *buffer, size_t offset, ElemCount numElems) { - getTypeHandler()->cleanHold(buffer, offset, numElems, - BufferTypeBase::CleanContext(_stats.extra_used_bytes_ref(), - _stats.extra_hold_bytes_ref())); - } void dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer); uint32_t getTypeId() const { return _typeId; } uint32_t getArraySize() const { return _arraySize; } @@ -119,7 +129,6 @@ public: const BufferTypeBase *getTypeHandler() const { return _typeHandler.load(std::memory_order_relaxed); } BufferTypeBase *getTypeHandler() { return _typeHandler.load(std::memory_order_relaxed); } - bool hasDisabledElemHoldList() const { return _disableElemHoldList; } void resume_primary_buffer(uint32_t buffer_id); }; diff --git a/vespalib/src/vespa/vespalib/datastore/datastore.h b/vespalib/src/vespa/vespalib/datastore/datastore.h index 86c39b547d3..6ff71238a89 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastore.h +++ b/vespalib/src/vespa/vespalib/datastore/datastore.h @@ -38,17 +38,6 @@ public: ~DataStoreT() override; /** - * Increase number of dead elements in buffer. - * - * @param ref Reference to dead stored features - * @param dead Number of newly dead elements - */ - void incDead(EntryRef ref, size_t deadElems) { - RefType intRef(ref); - DataStoreBase::incDead(intRef.bufferId(), deadElems); - } - - /** * Free element(s). */ void freeElem(EntryRef ref, size_t numElems) { free_elem_internal(ref, numElems, false); } @@ -97,7 +86,6 @@ class DataStore : public DataStoreT<RefT> protected: typedef DataStoreT<RefT> ParentType; using ParentType::ensureBufferCapacity; - using ParentType::_primary_buffer_ids; using ParentType::getEntry; using ParentType::dropBuffers; using ParentType::init_primary_buffers; diff --git a/vespalib/src/vespa/vespalib/datastore/datastore.hpp b/vespalib/src/vespa/vespalib/datastore/datastore.hpp index 90f7507f80f..18abe29d04f 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastore.hpp +++ b/vespalib/src/vespa/vespalib/datastore/datastore.hpp @@ -26,19 +26,7 @@ DataStoreT<RefT>::free_elem_internal(EntryRef ref, size_t numElems, bool was_hel { RefType intRef(ref); BufferState &state = getBufferState(intRef.bufferId()); - if (state.isActive()) { - if (state.free_list().enabled() && (numElems == state.getArraySize())) { - state.free_list().push_entry(ref); - } - } else { - assert(state.isOnHold() && was_held); - } - state.stats().inc_dead_elems(numElems); - if (was_held) { - state.stats().dec_hold_elems(numElems); - } - state.cleanHold(getBuffer(intRef.bufferId()), - intRef.offset() * state.getArraySize(), numElems); + state.free_elems(ref, numElems, was_held, intRef.offset()); } template <typename RefT> @@ -47,33 +35,27 @@ DataStoreT<RefT>::holdElem(EntryRef ref, size_t numElems, size_t extraBytes) { RefType intRef(ref); BufferState &state = getBufferState(intRef.bufferId()); - assert(state.isActive()); - if (state.hasDisabledElemHoldList()) { - state.stats().inc_dead_elems(numElems); - return; + if (!state.hold_elems(numElems, extraBytes)) { + _elemHold1List.push_back(ElemHold1ListElem(ref, numElems)); } - _elemHold1List.push_back(ElemHold1ListElem(ref, numElems)); - state.stats().inc_hold_elems(numElems); - state.stats().inc_extra_hold_bytes(extraBytes); } template <typename RefT> void DataStoreT<RefT>::trimElemHoldList(generation_t usedGen) { - ElemHold2List &elemHold2List = _elemHold2List; - - ElemHold2List::iterator it(elemHold2List.begin()); - ElemHold2List::iterator ite(elemHold2List.end()); + auto it = _elemHold2List.begin(); + auto ite = _elemHold2List.end(); uint32_t freed = 0; for (; it != ite; ++it) { - if (static_cast<sgeneration_t>(it->_generation - usedGen) >= 0) + if (static_cast<sgeneration_t>(it->_generation - usedGen) >= 0) { break; + } free_elem_internal(it->_ref, it->_len, true); ++freed; } if (freed != 0) { - elemHold2List.erase(elemHold2List.begin(), it); + _elemHold2List.erase(_elemHold2List.begin(), it); } } @@ -81,14 +63,12 @@ template <typename RefT> void DataStoreT<RefT>::clearElemHoldList() { - ElemHold2List &elemHold2List = _elemHold2List; - - ElemHold2List::iterator it(elemHold2List.begin()); - ElemHold2List::iterator ite(elemHold2List.end()); + auto it = _elemHold2List.begin(); + auto ite = _elemHold2List.end(); for (; it != ite; ++it) { free_elem_internal(it->_ref, it->_len, true); } - elemHold2List.clear(); + _elemHold2List.clear(); } template <typename RefT> diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp index 302a1b49219..dd6c767e9c6 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp +++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp @@ -223,9 +223,8 @@ DataStoreBase::addType(BufferTypeBase *typeHandler) void DataStoreBase::transferElemHoldList(generation_t generation) { - ElemHold2List &elemHold2List = _elemHold2List; - for (const ElemHold1ListElem & elemHold1 : _elemHold1List) { - elemHold2List.push_back(ElemHold2ListElem(elemHold1, generation)); + for (const auto& elemHold1 : _elemHold1List) { + _elemHold2List.push_back(ElemHold2ListElem(elemHold1, generation)); } _elemHold1List.clear(); } @@ -289,18 +288,18 @@ DataStoreBase::holdBuffer(uint32_t bufferId) { _states[bufferId].onHold(bufferId); size_t holdBytes = 0u; // getMemStats() still accounts held buffers - GenerationHeldBase::UP hold(new BufferHold(holdBytes, *this, bufferId)); + auto hold = std::make_unique<BufferHold>(holdBytes, *this, bufferId); _genHolder.hold(std::move(hold)); } void DataStoreBase::enableFreeLists() { - for (BufferState & bState : _states) { + for (auto& bState : _states) { if (!bState.isActive() || bState.getCompacting()) { continue; } - bState.free_list().enable(_free_lists[bState.getTypeId()]); + bState.enable_free_list(_free_lists[bState.getTypeId()]); } _freeListsEnabled = true; } @@ -308,8 +307,8 @@ DataStoreBase::enableFreeLists() void DataStoreBase::disableFreeLists() { - for (BufferState & bState : _states) { - bState.free_list().disable(); + for (auto& bState : _states) { + bState.disable_free_list(); } _freeListsEnabled = false; } @@ -321,17 +320,11 @@ DataStoreBase::enableFreeList(uint32_t bufferId) if (_freeListsEnabled && state.isActive() && !state.getCompacting()) { - state.free_list().enable(_free_lists[state.getTypeId()]); + state.enable_free_list(_free_lists[state.getTypeId()]); } } void -DataStoreBase::disableFreeList(uint32_t bufferId) -{ - _states[bufferId].free_list().disable(); -} - -void DataStoreBase::disableElemHoldList() { for (auto &state : _states) { @@ -346,9 +339,9 @@ DataStoreBase::getMemStats() const { MemoryStats stats; - for (const BufferState & bState: _states) { + for (const auto& bState: _states) { auto typeHandler = bState.getTypeHandler(); - BufferState::State state = bState.getState(); + auto state = bState.getState(); if ((state == BufferState::State::FREE) || (typeHandler == nullptr)) { ++stats._freeBuffers; } else if (state == BufferState::State::ACTIVE) { @@ -376,7 +369,7 @@ DataStoreBase::getAddressSpaceUsage() const size_t usedArrays = 0; size_t deadArrays = 0; size_t limitArrays = 0; - for (const BufferState & bState: _states) { + for (const auto& bState: _states) { if (bState.isActive()) { uint32_t arraySize = bState.getArraySize(); usedArrays += bState.size() / arraySize; @@ -392,7 +385,7 @@ DataStoreBase::getAddressSpaceUsage() const LOG_ABORT("should not be reached"); } } - return vespalib::AddressSpace(usedArrays, deadArrays, limitArrays); + return {usedArrays, deadArrays, limitArrays}; } void @@ -429,12 +422,11 @@ DataStoreBase::fallbackResize(uint32_t bufferId, size_t elemsNeeded) state.fallbackResize(bufferId, elemsNeeded, _buffers[bufferId].get_atomic_buffer(), toHoldBuffer); - GenerationHeldBase::UP - hold(new FallbackHold(oldAllocElems * elementSize, - std::move(toHoldBuffer), - oldUsedElems, - state.getTypeHandler(), - state.getTypeId())); + auto hold = std::make_unique<FallbackHold>(oldAllocElems * elementSize, + std::move(toHoldBuffer), + oldUsedElems, + state.getTypeHandler(), + state.getTypeId()); if (!_initializing) { _genHolder.hold(std::move(hold)); } @@ -452,7 +444,7 @@ DataStoreBase::markCompacting(uint32_t bufferId) assert(!state.getCompacting()); state.setCompacting(); state.disableElemHoldList(); - state.free_list().disable(); + state.disable_free_list(); inc_compaction_count(); } @@ -460,9 +452,15 @@ std::unique_ptr<CompactingBuffers> DataStoreBase::start_compact_worst_buffers(CompactionSpec compaction_spec, const CompactionStrategy& compaction_strategy) { // compact memory usage - CompactBufferCandidates elem_buffers(_numBuffers, compaction_strategy.get_max_buffers(), compaction_strategy.get_active_buffers_ratio(), compaction_strategy.getMaxDeadBytesRatio() / 2, CompactionStrategy::DEAD_BYTES_SLACK); + CompactBufferCandidates elem_buffers(_numBuffers, compaction_strategy.get_max_buffers(), + compaction_strategy.get_active_buffers_ratio(), + compaction_strategy.getMaxDeadBytesRatio() / 2, + CompactionStrategy::DEAD_BYTES_SLACK); // compact address space - CompactBufferCandidates array_buffers(_numBuffers, compaction_strategy.get_max_buffers(), compaction_strategy.get_active_buffers_ratio(), compaction_strategy.getMaxDeadAddressSpaceRatio() / 2, CompactionStrategy::DEAD_ADDRESS_SPACE_SLACK); + CompactBufferCandidates array_buffers(_numBuffers, compaction_strategy.get_max_buffers(), + compaction_strategy.get_active_buffers_ratio(), + compaction_strategy.getMaxDeadAddressSpaceRatio() / 2, + CompactionStrategy::DEAD_ADDRESS_SPACE_SLACK); uint32_t free_buffers = 0; for (uint32_t bufferId = 0; bufferId < _numBuffers; ++bufferId) { const auto &state = getBufferState(bufferId); diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h index 4038e12efee..28c47207a9f 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h +++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h @@ -25,9 +25,10 @@ class CompactionStrategy; */ class DataStoreBase { -public: +protected: /** - * Hold list before freeze, before knowing how long elements must be held. + * Hold list element used in the first phase of holding (before freeze), + * before knowing how long elements must be held. */ class ElemHold1ListElem { @@ -41,10 +42,27 @@ public: { } }; -protected: using generation_t = vespalib::GenerationHandler::generation_t; using sgeneration_t = vespalib::GenerationHandler::sgeneration_t; + /** + * Hold list element used in the second phase of holding (at freeze), + * when knowing how long elements must be held. + */ + class ElemHold2ListElem : public ElemHold1ListElem + { + public: + generation_t _generation; + + ElemHold2ListElem(const ElemHold1ListElem &hold1, generation_t generation) + : ElemHold1ListElem(hold1), + _generation(generation) + { } + }; + + using ElemHold1List = vespalib::Array<ElemHold1ListElem>; + using ElemHold2List = std::deque<ElemHold2ListElem>; + private: class BufferAndTypeId { public: @@ -60,32 +78,16 @@ private: uint32_t _typeId; }; std::vector<BufferAndTypeId> _buffers; // For fast mapping with known types -protected: + // Provides a mapping from typeId -> primary buffer for that type. // The primary buffer is used for allocations of new element(s) if no available slots are found in free lists. std::vector<uint32_t> _primary_buffer_ids; +protected: void* getBuffer(uint32_t bufferId) { return _buffers[bufferId].get_buffer_relaxed(); } /** - * Hold list at freeze, when knowing how long elements must be held - */ - class ElemHold2ListElem : public ElemHold1ListElem - { - public: - generation_t _generation; - - ElemHold2ListElem(const ElemHold1ListElem &hold1, generation_t generation) - : ElemHold1ListElem(hold1), - _generation(generation) - { } - }; - - using ElemHold1List = vespalib::Array<ElemHold1ListElem>; - using ElemHold2List = std::deque<ElemHold2ListElem>; - - /** - * Class used to hold the old buffer as part of fallbackResize(). + * Class used to hold the entire old buffer as part of fallbackResize(). */ class FallbackHold : public vespalib::GenerationHeldBase { @@ -129,6 +131,7 @@ protected: virtual ~DataStoreBase(); +private: /** * Get the next buffer id after the given buffer id. */ @@ -138,6 +141,7 @@ protected: ret = 0; return ret; } +protected: /** * Get the primary buffer for the given type id. @@ -156,6 +160,7 @@ protected: virtual void clearElemHoldList() = 0; void markCompacting(uint32_t bufferId); + public: uint32_t addType(BufferTypeBase *typeHandler); void init_primary_buffers(); @@ -191,9 +196,11 @@ public: */ void switch_primary_buffer(uint32_t typeId, size_t elemsNeeded); +private: bool consider_grow_active_buffer(uint32_t type_id, size_t elems_needed); void switch_or_grow_primary_buffer(uint32_t typeId, size_t elemsNeeded); +public: vespalib::MemoryUsage getMemoryUsage() const; vespalib::AddressSpace getAddressSpaceUsage() const; @@ -205,6 +212,8 @@ public: const BufferState &getBufferState(uint32_t bufferId) const { return _states[bufferId]; } BufferState &getBufferState(uint32_t bufferId) { return _states[bufferId]; } uint32_t getNumBuffers() const { return _numBuffers; } + +private: bool hasElemHold1() const { return !_elemHold1List.empty(); } /** @@ -212,16 +221,19 @@ public: */ void transferElemHoldList(generation_t generation); +public: /** * Transfer holds from hold1 to hold2 lists, assigning generation. */ void transferHoldLists(generation_t generation); +private: /** * Hold of buffer has ended. */ void doneHoldBuffer(uint32_t bufferId); +public: /** * Trim hold lists, freeing buffers that no longer needs to be held. * @@ -253,12 +265,6 @@ public: void dropBuffers(); - - void incDead(uint32_t bufferId, size_t deadElems) { - BufferState &state = _states[bufferId]; - state.stats().inc_dead_elems(deadElems); - } - /** * Enable free list management. * This only works for fixed size elements. @@ -270,16 +276,14 @@ public: */ void disableFreeLists(); +private: /** * Enable free list management. * This only works for fixed size elements. */ void enableFreeList(uint32_t bufferId); - /** - * Disable free list management. - */ - void disableFreeList(uint32_t bufferId); +public: void disableElemHoldList(); bool has_free_lists_enabled() const { return _freeListsEnabled; } @@ -312,14 +316,18 @@ private: void onActive(uint32_t bufferId, uint32_t typeId, size_t elemsNeeded); void inc_hold_buffer_count(); + public: uint32_t getTypeId(uint32_t bufferId) const { return _buffers[bufferId].getTypeId(); } void finishCompact(const std::vector<uint32_t> &toHold); + +private: void fallbackResize(uint32_t bufferId, size_t elementsNeeded); +public: vespalib::GenerationHolder &getGenerationHolder() { return _genHolder; } diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java new file mode 100644 index 00000000000..e03e0b07944 --- /dev/null +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.common.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This RequestProcessor logs requests to disk. It batches the requests to do + * the io efficiently. The request is not passed to the next RequestProcessor + * until its log has been synced to disk. + * + * SyncRequestProcessor is used in 3 different cases + * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which + * send ack back to itself. + * 2. Follower - Sync request to disk and forward request to + * SendAckRequestProcessor which send the packets to leader. + * SendAckRequestProcessor is flushable which allow us to force + * push packets to leader. + * 3. Observer - Sync committed request to disk (received as INFORM packet). + * It never send ack back to the leader, so the nextProcessor will + * be null. This change the semantic of txnlog on the observer + * since it only contains committed txns. + */ +public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class); + + private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue<Request> delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private void close() { + if (delayed == null) { + delayed = new ArrayDeque<>(); + } + } + private void open() throws RequestProcessorException { + if (delayed != null) { + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + } + + /** The number of log entries to log before starting a snapshot */ + private static int snapCount = ZooKeeperServer.getSnapCount(); + + /** + * The total size of log entries before starting a snapshot + */ + private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes(); + + /** + * Random numbers used to vary snapshot timing + */ + private int randRoll; + private long randSize; + + private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); + + private final Semaphore snapThreadMutex = new Semaphore(1); + + private final ZooKeeperServer zks; + + private final DelayingProcessor nextProcessor; + + /** + * Transactions that have been written and are waiting to be flushed to + * disk. Basically this is the list of SyncItems whose callbacks will be + * invoked after flush returns successfully. + */ + private final Queue<Request> toFlush; + private long lastFlushTime; + + public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { + super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); + this.zks = zks; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); + this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); + } + + /** + * used by tests to check for changing + * snapcounts + * @param count + */ + public static void setSnapCount(int count) { + snapCount = count; + } + + /** + * used by tests to get the snapcount + * @return the snapcount + */ + public static int getSnapCount() { + return snapCount; + } + + private long getRemainingDelay() { + long flushDelay = zks.getFlushDelay(); + long duration = Time.currentElapsedTime() - lastFlushTime; + if (duration < flushDelay) { + return flushDelay - duration; + } + return 0; + } + + /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush + * whenever either condition is hit. If only one or the other is + * set, flush only when the relevant condition is hit. + */ + private boolean shouldFlush() { + long flushDelay = zks.getFlushDelay(); + long maxBatchSize = zks.getMaxBatchSize(); + if ((flushDelay > 0) && (getRemainingDelay() == 0)) { + return true; + } + return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize); + } + + /** + * used by tests to check for changing + * snapcounts + * @param size + */ + public static void setSnapSizeInBytes(long size) { + snapSizeInBytes = size; + } + + private boolean shouldSnapshot() { + int logCount = zks.getZKDatabase().getTxnCount(); + long logSize = zks.getZKDatabase().getTxnSize(); + return (logCount > (snapCount / 2 + randRoll)) + || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); + } + + private void resetSnapshotStats() { + randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); + randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); + } + + @Override + public void run() { + try { + // we do this in an attempt to ensure that not all of the servers + // in the ensemble take a snapshot at the same time + resetSnapshotStats(); + lastFlushTime = Time.currentElapsedTime(); + while (true) { + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); + + long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); + Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); + if (si == null) { + /* We timed out looking for more writes to batch, go ahead and flush immediately */ + flush(); + si = queuedRequests.take(); + } + + if (si == REQUEST_OF_DEATH) { + break; + } + + if (si == turnForwardingDelayOn) { + nextProcessor.close(); + continue; + } + if (si == turnForwardingDelayOff) { + nextProcessor.open(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + + long startProcessTime = Time.currentElapsedTime(); + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); + + // track the number of records written to the log + if (!si.isThrottled() && zks.getZKDatabase().append(si)) { + if (shouldSnapshot()) { + resetSnapshotStats(); + // roll the log + zks.getZKDatabase().rollLog(); + // take a snapshot + if (!snapThreadMutex.tryAcquire()) { + LOG.warn("Too busy to snap, skipping"); + } else { + new ZooKeeperThread("Snapshot Thread") { + public void run() { + try { + zks.takeSnapshot(); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + snapThreadMutex.release(); + } + } + }.start(); + } + } + } else if (toFlush.isEmpty()) { + // optimization for read heavy workloads + // iff this is a read or a throttled request(which doesn't need to be written to the disk), + // and there are no pending flushes (writes), then just pass this to the next processor + if (nextProcessor != null) { + nextProcessor.processRequest(si); + nextProcessor.flush(); + } + continue; + } + toFlush.add(si); + if (shouldFlush()) { + flush(); + } + ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); + } + } catch (Throwable t) { + handleException(this.getName(), t); + } + LOG.info("SyncRequestProcessor exited!"); + } + + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + } + + private void flush() throws IOException, RequestProcessorException { + if (this.toFlush.isEmpty()) { + return; + } + + ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); + + long flushStartTime = Time.currentElapsedTime(); + zks.getZKDatabase().commit(); + ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); + + if (this.nextProcessor == null) { + this.toFlush.clear(); + } else { + while (!this.toFlush.isEmpty()) { + final Request i = this.toFlush.remove(); + long latency = Time.currentElapsedTime() - i.syncQueueStartTime; + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); + this.nextProcessor.processRequest(i); + } + nextProcessor.flush(); + } + lastFlushTime = Time.currentElapsedTime(); + } + + public void shutdown() { + LOG.info("Shutting down"); + queuedRequests.add(REQUEST_OF_DEATH); + try { + this.join(); + this.flush(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while wating for {} to finish", this); + Thread.currentThread().interrupt(); + } catch (IOException e) { + LOG.warn("Got IO exception during shutdown"); + } catch (RequestProcessorException e) { + LOG.warn("Got request processor exception during shutdown"); + } + if (nextProcessor != null) { + nextProcessor.shutdown(); + } + } + + public void processRequest(final Request request) { + Objects.requireNonNull(request, "Request cannot be null"); + + request.syncQueueStartTime = Time.currentElapsedTime(); + queuedRequests.add(request); + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); + } + +} diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java new file mode 100644 index 00000000000..8e80fae57dc --- /dev/null +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLSocket; +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.server.ExitCode; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.TxnLogEntry; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ConfigUtils; +import org.apache.zookeeper.server.util.MessageTracker; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnDigest; +import org.apache.zookeeper.txn.TxnHeader; +import org.apache.zookeeper.util.ServiceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is the superclass of two of the three main actors in a ZK + * ensemble: Followers and Observers. Both Followers and Observers share + * a good deal of code which is moved into Peer to avoid duplication. + */ +public class Learner { + + static class PacketInFlight { + + TxnHeader hdr; + Record rec; + TxnDigest digest; + + } + + QuorumPeer self; + LearnerZooKeeperServer zk; + + protected BufferedOutputStream bufferedOutput; + + protected Socket sock; + protected MultipleAddresses leaderAddr; + protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false); + + /** + * Socket getter + */ + public Socket getSocket() { + return sock; + } + + LearnerSender sender = null; + protected InputArchive leaderIs; + protected OutputArchive leaderOs; + /** the protocol version of the leader */ + protected int leaderProtocolVersion = 0x01; + + private static final int BUFFERED_MESSAGE_SIZE = 10; + protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + + protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); + + /** + * Time to wait after connection attempt with the Leader or LearnerMaster before this + * Learner tries to connect again. + */ + private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100); + + private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); + + public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending"; + private static boolean asyncSending = + Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING)); + public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync"; + public static final boolean closeSocketAsync = Boolean + .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC)); + + static { + LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); + LOG.info("TCP NoDelay set to: {}", nodelay); + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync); + } + + final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>(); + + public int getPendingRevalidationsCount() { + return pendingRevalidations.size(); + } + + // for testing + protected static void setAsyncSending(boolean newMode) { + asyncSending = newMode; + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + + } + protected static boolean getAsyncSending() { + return asyncSending; + } + /** + * validate a session for a client + * + * @param clientId + * the client to be revalidated + * @param timeout + * the timeout for which the session is valid + * @throws IOException + */ + void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { + LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeLong(clientId); + dos.writeInt(timeout); + dos.close(); + QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); + pendingRevalidations.put(clientId, cnxn); + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "To validate session 0x" + Long.toHexString(clientId)); + } + writePacket(qp, true); + } + + /** + * write a packet to the leader. + * + * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time. + * When packets are sent synchronously, writing is done within a synchronization block. + * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe. + * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only. + * So we have only one thread writing to leaderOs at a time in either case. + * + * @param pp + * the proposal packet to be sent to the leader + * @throws IOException + */ + void writePacket(QuorumPacket pp, boolean flush) throws IOException { + if (asyncSending) { + sender.queuePacket(pp); + } else { + writePacketNow(pp, flush); + } + } + + void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { + synchronized (leaderOs) { + if (pp != null) { + messageTracker.trackSent(pp.getType()); + leaderOs.writeRecord(pp, "packet"); + } + if (flush) { + bufferedOutput.flush(); + } + } + } + + /** + * Start thread that will forward any packet in the queue to the leader + */ + protected void startSendingThread() { + sender = new LearnerSender(this); + sender.start(); + } + + /** + * read a packet from the leader + * + * @param pp + * the packet to be instantiated + * @throws IOException + */ + void readPacket(QuorumPacket pp) throws IOException { + synchronized (leaderIs) { + leaderIs.readRecord(pp, "packet"); + messageTracker.trackReceived(pp.getType()); + } + if (LOG.isTraceEnabled()) { + final long traceMask = + (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK + : ZooTrace.SERVER_PACKET_TRACE_MASK; + + ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp); + } + } + + /** + * send a request packet to the leader + * + * @param request + * the request from the client + * @throws IOException + */ + void request(Request request) throws IOException { + if (request.isThrottled()) { + LOG.error("Throttled request sent to leader: {}. Exiting", request); + ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeLong(request.sessionId); + oa.writeInt(request.cxid); + oa.writeInt(request.type); + if (request.request != null) { + request.request.rewind(); + int len = request.request.remaining(); + byte[] b = new byte[len]; + request.request.get(b); + request.request.rewind(); + oa.write(b); + } + oa.close(); + QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); + writePacket(qp, true); + } + + /** + * Returns the address of the node we think is the leader. + */ + protected QuorumServer findLeader() { + QuorumServer leaderServer = null; + // Find the leader by id + Vote current = self.getCurrentVote(); + for (QuorumServer s : self.getView().values()) { + if (s.id == current.getId()) { + // Ensure we have the leader's correct IP address before + // attempting to connect. + s.recreateSocketAddresses(); + leaderServer = s; + break; + } + } + if (leaderServer == null) { + LOG.warn("Couldn't find the leader with id = {}", current.getId()); + } + return leaderServer; + } + + /** + * Overridable helper method to return the System.nanoTime(). + * This method behaves identical to System.nanoTime(). + */ + protected long nanoTime() { + return System.nanoTime(); + } + + /** + * Overridable helper method to simply call sock.connect(). This can be + * overriden in tests to fake connection success/failure for connectToLeader. + */ + protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException { + sock.connect(addr, timeout); + } + + /** + * Establish a connection with the LearnerMaster found by findLearnerMaster. + * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. + * Retries until either initLimit time has elapsed or 5 tries have happened. + * @param multiAddr - the address of the Peer to connect to. + * @throws IOException - if the socket connection fails on the 5th attempt + * if there is an authentication failure while connecting to leader + */ + protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException { + + this.leaderAddr = multiAddr; + Set<InetSocketAddress> addresses; + if (self.isMultiAddressReachabilityCheckEnabled()) { + // even if none of the addresses are reachable, we want to try to establish connection + // see ZOOKEEPER-3758 + addresses = multiAddr.getAllReachableAddressesOrAll(); + } else { + addresses = multiAddr.getAllAddresses(); + } + ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); + CountDownLatch latch = new CountDownLatch(addresses.size()); + AtomicReference<Socket> socket = new AtomicReference<>(null); + addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit); + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while trying to connect to Leader", e); + } finally { + executor.shutdown(); + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.error("not all the LeaderConnector terminated properly"); + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while terminating LeaderConnector executor.", ie); + } + } + + if (socket.get() == null) { + throw new IOException("Failed connect to " + multiAddr); + } else { + sock = socket.get(); + sockBeingClosed.set(false); + } + + self.authLearner.authenticate(sock, hostname); + + leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); + bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); + leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); + if (asyncSending) { + startSendingThread(); + } + } + + class LeaderConnector implements Runnable { + + private AtomicReference<Socket> socket; + private InetSocketAddress address; + private CountDownLatch latch; + + LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) { + this.address = address; + this.socket = socket; + this.latch = latch; + } + + @Override + public void run() { + try { + Thread.currentThread().setName("LeaderConnector-" + address); + Socket sock = connectToLeader(); + + if (sock != null && sock.isConnected()) { + if (socket.compareAndSet(null, sock)) { + LOG.info("Successfully connected to leader, using address: {}", address); + } else { + LOG.info("Connection to the leader is already established, close the redundant connection"); + sock.close(); + } + } + + } catch (Exception e) { + LOG.error("Failed connect to {}", address, e); + } finally { + latch.countDown(); + } + } + + private Socket connectToLeader() throws IOException, X509Exception, InterruptedException { + Socket sock = createSocket(); + + // leader connection timeout defaults to tickTime * initLimit + int connectTimeout = self.tickTime * self.initLimit; + + // but if connectToLearnerMasterLimit is specified, use that value to calculate + // timeout instead of using the initLimit value + if (self.connectToLearnerMasterLimit > 0) { + connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; + } + + int remainingTimeout; + long startNanoTime = nanoTime(); + + for (int tries = 0; tries < 5 && socket.get() == null; tries++) { + try { + // recalculate the init limit time because retries sleep for 1000 milliseconds + remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); + if (remainingTimeout <= 0) { + LOG.error("connectToLeader exceeded on retries."); + throw new IOException("connectToLeader exceeded on retries."); + } + + sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout)); + if (self.isSslQuorum()) { + ((SSLSocket) sock).startHandshake(); + } + sock.setTcpNoDelay(nodelay); + break; + } catch (IOException e) { + remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); + + if (remainingTimeout <= leaderConnectDelayDuringRetryMs) { + LOG.error( + "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + throw e; + } else if (tries >= 4) { + LOG.error( + "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + throw e; + } else { + LOG.warn( + "Unexpected exception, tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + sock = createSocket(); + } + } + Thread.sleep(leaderConnectDelayDuringRetryMs); + } + + return sock; + } + } + + /** + * Creating a simple or and SSL socket. + * This can be overridden in tests to fake already connected sockets for connectToLeader. + */ + protected Socket createSocket() throws X509Exception, IOException { + Socket sock; + if (self.isSslQuorum()) { + sock = self.getX509Util().createSSLSocket(); + } else { + sock = new Socket(); + } + sock.setSoTimeout(self.tickTime * self.initLimit); + return sock; + } + + /** + * Once connected to the leader or learner master, perform the handshake + * protocol to establish a following / observing connection. + * @param pktType + * @return the zxid the Leader sends for synchronization purposes. + * @throws IOException + */ + protected long registerWithLeader(int pktType) throws IOException { + /* + * Send follower info, including last zxid and sid + */ + long lastLoggedZxid = self.getLastLoggedZxid(); + QuorumPacket qp = new QuorumPacket(); + qp.setType(pktType); + qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); + + /* + * Add sid to payload + */ + LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); + ByteArrayOutputStream bsid = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); + boa.writeRecord(li, "LearnerInfo"); + qp.setData(bsid.toByteArray()); + + writePacket(qp, true); + readPacket(qp); + final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + if (qp.getType() == Leader.LEADERINFO) { + // we are connected to a 1.0 server so accept the new epoch and read the next packet + leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); + byte[] epochBytes = new byte[4]; + final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); + if (newEpoch > self.getAcceptedEpoch()) { + wrappedEpochBytes.putInt((int) self.getCurrentEpoch()); + self.setAcceptedEpoch(newEpoch); + } else if (newEpoch == self.getAcceptedEpoch()) { + // since we have already acked an epoch equal to the leaders, we cannot ack + // again, but we still need to send our lastZxid to the leader so that we can + // sync with it if it does assume leadership of the epoch. + // the -1 indicates that this reply should not count as an ack for the new epoch + wrappedEpochBytes.putInt(-1); + } else { + throw new IOException("Leaders epoch, " + + newEpoch + + " is less than accepted epoch, " + + self.getAcceptedEpoch()); + } + QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); + writePacket(ackNewEpoch, true); + return ZxidUtils.makeZxid(newEpoch, 0); + } else { + if (newEpoch > self.getAcceptedEpoch()) { + self.setAcceptedEpoch(newEpoch); + } + if (qp.getType() != Leader.NEWLEADER) { + LOG.error("First packet should have been NEWLEADER"); + throw new IOException("First packet should have been NEWLEADER"); + } + return qp.getZxid(); + } + } + + /** + * Finally, synchronize our history with the Leader (if Follower) + * or the LearnerMaster (if Observer). + * @param newLeaderZxid + * @throws IOException + * @throws InterruptedException + */ + protected void syncWithLeader(long newLeaderZxid) throws Exception { + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); + QuorumPacket qp = new QuorumPacket(); + long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); + + QuorumVerifier newLeaderQV = null; + + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + boolean syncSnapshot = false; + readPacket(qp); + Deque<Long> packetsCommitted = new ArrayDeque<>(); + Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>(); + Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); + synchronized (zk) { + if (qp.getType() == Leader.DIFF) { + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + self.setSyncMode(QuorumPeer.SyncMode.DIFF); + if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { + LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); + snapshotNeeded = true; + syncSnapshot = true; + } else { + snapshotNeeded = false; + } + } else if (qp.getType() == Leader.SNAP) { + self.setSyncMode(QuorumPeer.SyncMode.SNAP); + LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); + // The leader is going to dump the database + // db is clear as part of deserializeSnapshot() + zk.getZKDatabase().deserializeSnapshot(leaderIs); + // ZOOKEEPER-2819: overwrite config node content extracted + // from leader snapshot with local config, to avoid potential + // inconsistency of config node content during rolling restart. + if (!self.isReconfigEnabled()) { + LOG.debug("Reset config node content from local config after deserialization of snapshot."); + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + } + String signature = leaderIs.readString("signature"); + if (!signature.equals("BenWasHere")) { + LOG.error("Missing signature. Got {}", signature); + throw new IOException("Missing signature"); + } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + // immediately persist the latest snapshot when there is txn log gap + syncSnapshot = true; + } else if (qp.getType() == Leader.TRUNC) { + //we need to truncate the log to the lastzxid of the leader + self.setSyncMode(QuorumPeer.SyncMode.TRUNC); + LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); + boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); + if (!truncated) { + // not able to truncate the log + LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); + ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); + } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + } else { + LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); + ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); + } + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + zk.createSessionTracker(); + + long lastQueued = 0; + + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) + // we need to make sure that we don't take the snapshot twice. + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; + TxnLogEntry logEntry; + // we are now going to start getting transactions to apply followed by an UPTODATE + outerLoop: + while (self.isRunning()) { + readPacket(qp); + switch (qp.getType()) { + case Leader.PROPOSAL: + PacketInFlight pif = new PacketInFlight(); + logEntry = SerializeUtils.deserializeTxn(qp.getData()); + pif.hdr = logEntry.getHeader(); + pif.rec = logEntry.getTxn(); + pif.digest = logEntry.getDigest(); + if (pif.hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", + Long.toHexString(pif.hdr.getZxid()), + Long.toHexString(lastQueued + 1)); + } + lastQueued = pif.hdr.getZxid(); + + if (pif.hdr.getType() == OpCode.reconfig) { + SetDataTxn setDataTxn = (SetDataTxn) pif.rec; + QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); + self.setLastSeenQuorumVerifier(qv, true); + } + + packetsNotLogged.add(pif); + packetsNotCommitted.add(pif); + break; + case Leader.COMMIT: + case Leader.COMMITANDACTIVATE: + pif = packetsNotCommitted.peekFirst(); + if (pif.hdr.getZxid() != qp.getZxid()) { + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(qp.getZxid()), + Long.toHexString(pif.hdr.getZxid())); + } else { + if (qp.getType() == Leader.COMMITANDACTIVATE) { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig( + qv, + ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), + true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (!writeToTxnLog) { + zk.processTxn(pif.hdr, pif.rec); + packetsNotLogged.remove(); + packetsNotCommitted.remove(); + } else { + packetsNotCommitted.remove(); + packetsCommitted.add(qp.getZxid()); + } + } + break; + case Leader.INFORM: + case Leader.INFORMANDACTIVATE: + PacketInFlight packet = new PacketInFlight(); + + if (qp.getType() == Leader.INFORMANDACTIVATE) { + ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); + long suggestedLeaderId = buffer.getLong(); + byte[] remainingdata = new byte[buffer.remaining()]; + buffer.get(remainingdata); + logEntry = SerializeUtils.deserializeTxn(remainingdata); + packet.hdr = logEntry.getHeader(); + packet.rec = logEntry.getTxn(); + packet.digest = logEntry.getDigest(); + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } else { + logEntry = SerializeUtils.deserializeTxn(qp.getData()); + packet.rec = logEntry.getTxn(); + packet.hdr = logEntry.getHeader(); + packet.digest = logEntry.getDigest(); + // Log warning message if txn comes out-of-order + if (packet.hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", + Long.toHexString(packet.hdr.getZxid()), + Long.toHexString(lastQueued + 1)); + } + lastQueued = packet.hdr.getZxid(); + } + if (!writeToTxnLog) { + // Apply to db directly if we haven't taken the snapshot + zk.processTxn(packet.hdr, packet.rec); + } else { + packetsNotLogged.add(packet); + packetsCommitted.add(qp.getZxid()); + } + + break; + case Leader.UPTODATE: + LOG.info("Learner received UPTODATE message"); + if (newLeaderQV != null) { + boolean majorChange = self.processReconfig(newLeaderQV, null, null, true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (isPreZAB1_0) { + zk.takeSnapshot(syncSnapshot); + self.setCurrentEpoch(newEpoch); + } + self.setZooKeeperServer(zk); + self.adminServer.setZooKeeperServer(zk); + break outerLoop; + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 + LOG.info("Learner received NEWLEADER message"); + if (qp.getData() != null && qp.getData().length > 1) { + try { + QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8)); + self.setLastSeenQuorumVerifier(qv, true); + newLeaderQV = qv; + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (snapshotNeeded) { + zk.takeSnapshot(syncSnapshot); + } + + self.setCurrentEpoch(newEpoch); + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + packetsNotLogged.clear(); + fzk.syncProcessor.syncFlush(); + } + + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); + } + break; + } + } + } + ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); + writePacket(ack, true); + zk.startServing(); + /* + * Update the election vote here to ensure that all members of the + * ensemble report the same vote to new servers that start up and + * send leader election notifications to the ensemble. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 + */ + self.updateElectionVote(newEpoch); + + // We need to log the stuff that came in between the snapshot and the uptodate + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : packetsCommitted) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + // Similar to follower, we need to log requests between the snapshot + // and UPTODATE + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + Long zxid = packetsCommitted.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + packetsCommitted.remove(); + Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); + request.setTxn(p.rec); + request.setHdr(p.hdr); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); + } + } + + protected void revalidate(QuorumPacket qp) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long sessionId = dis.readLong(); + boolean valid = dis.readBoolean(); + ServerCnxn cnxn = pendingRevalidations.remove(sessionId); + if (cnxn == null) { + LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId)); + } else { + zk.finishSessionInit(cnxn, valid); + } + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid); + } + } + + protected void ping(QuorumPacket qp) throws IOException { + // Send back the ping with our session data + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + Map<Long, Integer> touchTable = zk.getTouchSnapshot(); + for (Entry<Long, Integer> entry : touchTable.entrySet()) { + dos.writeLong(entry.getKey()); + dos.writeInt(entry.getValue()); + } + + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); + writePacket(pingReply, true); + } + + /** + * Shutdown the Peer + */ + public void shutdown() { + self.setZooKeeperServer(null); + self.closeAllConnections(); + self.adminServer.setZooKeeperServer(null); + + if (sender != null) { + sender.shutdown(); + } + + closeSocket(); + // shutdown previous zookeeper + if (zk != null) { + // If we haven't finished SNAP sync, force fully shutdown + // to avoid potential inconsistency + zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP)); + } + } + + boolean isRunning() { + return self.isRunning() && zk.isRunning(); + } + + void closeSocket() { + if (sock != null) { + if (sockBeingClosed.compareAndSet(false, true)) { + if (closeSocketAsync) { + final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId()); + closingThread.setDaemon(true); + closingThread.start(); + } else { + closeSockSync(); + } + } + } + } + + void closeSockSync() { + try { + long startTime = Time.currentElapsedTime(); + if (sock != null) { + sock.close(); + sock = null; + } + ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime); + } catch (IOException e) { + LOG.warn("Ignoring error closing connection to leader", e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index c1399e53083..3b7a9dfc331 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -67,7 +67,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable { LOG.warn("Closing connection to leader, exception during packet send", e); try { Socket socket = learner.sock; - if ( socket != null && ! learner.sock.isClosed()) { + if (socket != null && !socket.isClosed()) { learner.sock.close(); } } catch (IOException e1) { |