diff options
43 files changed, 1010 insertions, 425 deletions
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java index bd6c2607fc2..c9f6f67df6c 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java @@ -1,7 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.api.integration.horizon; -import com.yahoo.slime.Slime; import java.io.InputStream; /** diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java index 422b8f22000..be8613f5eff 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java @@ -16,7 +16,9 @@ import com.yahoo.yolean.Exceptions; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.util.Optional; +import java.util.function.Supplier; import java.util.logging.Level; /** @@ -57,10 +59,10 @@ public class HorizonApiHandler extends LoggingRequestHandler { private HttpResponse get(HttpRequest request) { Path path = new Path(request.getUri()); - if (path.matches("/horizon/v1/config/dashboard/topFolders")) return new JsonInputStreamResponse(client.getTopFolders()); - if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return new JsonInputStreamResponse(client.getDashboard(path.get("id"))); - if (path.matches("/horizon/v1/config/dashboard/favorite")) return new JsonInputStreamResponse(client.getFavorite(request.getProperty("user"))); - if (path.matches("/horizon/v1/config/dashboard/recent")) return new JsonInputStreamResponse(client.getRecent(request.getProperty("user"))); + if (path.matches("/horizon/v1/config/dashboard/topFolders")) return jsonInputStreamResponse(client::getTopFolders); + if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return jsonInputStreamResponse(() -> client.getDashboard(path.get("id"))); + if (path.matches("/horizon/v1/config/dashboard/favorite")) return jsonInputStreamResponse(() -> client.getFavorite(request.getProperty("user"))); + if (path.matches("/horizon/v1/config/dashboard/recent")) return jsonInputStreamResponse(() -> client.getRecent(request.getProperty("user"))); return ErrorResponse.notFoundError("Nothing at " + path); } @@ -73,7 +75,7 @@ public class HorizonApiHandler extends LoggingRequestHandler { private HttpResponse put(HttpRequest request) { Path path = new Path(request.getUri()); - if (path.matches("/horizon/v1/config/user")) return new JsonInputStreamResponse(client.getUser()); + if (path.matches("/horizon/v1/config/user")) return jsonInputStreamResponse(client::getUser); return ErrorResponse.notFoundError("Nothing at " + path); } @@ -81,7 +83,7 @@ public class HorizonApiHandler extends LoggingRequestHandler { SecurityContext securityContext = getAttribute(request, SecurityContext.ATTRIBUTE_NAME, SecurityContext.class); try { byte[] data = TsdbQueryRewriter.rewrite(request.getData().readAllBytes(), securityContext.roles(), systemName); - return new JsonInputStreamResponse(client.getMetrics(data)); + return jsonInputStreamResponse(() -> client.getMetrics(data)); } catch (TsdbQueryRewriter.UnauthorizedException e) { return ErrorResponse.forbidden("Access denied"); } catch (IOException e) { @@ -96,6 +98,14 @@ public class HorizonApiHandler extends LoggingRequestHandler { .orElseThrow(() -> new IllegalArgumentException("Attribute '" + attributeName + "' was not set on request")); } + private JsonInputStreamResponse jsonInputStreamResponse(Supplier<InputStream> supplier) { + try (InputStream inputStream = supplier.get()) { + return new JsonInputStreamResponse(inputStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static class JsonInputStreamResponse extends HttpResponse { private final InputStream jsonInputStream; diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java index dd4b62ee494..6136bcdfd3a 100644 --- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java +++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java @@ -3,12 +3,14 @@ package com.yahoo.jdisc.http.filter.security.athenz; import com.google.inject.Inject; import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.filter.DiscFilterRequest; import com.yahoo.jdisc.http.filter.security.athenz.RequestResourceMapper.ResourceNameAndAction; import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase; import com.yahoo.vespa.athenz.api.AthenzAccessToken; import com.yahoo.vespa.athenz.api.AthenzIdentity; import com.yahoo.vespa.athenz.api.AthenzPrincipal; +import com.yahoo.vespa.athenz.api.AthenzRole; import com.yahoo.vespa.athenz.api.ZToken; import com.yahoo.vespa.athenz.tls.AthenzX509CertificateUtils; import com.yahoo.vespa.athenz.utils.AthenzIdentities; @@ -20,6 +22,7 @@ import java.security.cert.X509Certificate; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.logging.Level; @@ -56,16 +59,20 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { private final RequestResourceMapper requestResourceMapper; private final Metric metric; private final Set<AthenzIdentity> allowedProxyIdentities; + private final Optional<AthenzRole> readRole; + private final Optional<AthenzRole> writeRole; @Inject public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config, RequestResourceMapper resourceMapper, Metric metric) { - this(config, resourceMapper, new DefaultZpe(), metric); + this(config, resourceMapper, new DefaultZpe(), metric, null, null); } public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config, RequestResourceMapper resourceMapper, Zpe zpe, - Metric metric) { + Metric metric, + AthenzRole readRole, + AthenzRole writeRole) { this.roleTokenHeaderName = config.roleTokenHeaderName(); List<EnabledCredentials.Enum> enabledCredentials = config.enabledCredentials(); this.enabledCredentials = enabledCredentials.isEmpty() @@ -77,6 +84,8 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { this.allowedProxyIdentities = config.allowedProxyIdentities().stream() .map(AthenzIdentities::from) .collect(Collectors.toSet()); + this.readRole = Optional.ofNullable(readRole); + this.writeRole = Optional.ofNullable(writeRole); } @Override @@ -86,7 +95,7 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { requestResourceMapper.getResourceNameAndAction(request); log.log(Level.FINE, () -> String.format("Resource mapping for '%s': %s", request, resourceMapping)); if (resourceMapping.isEmpty()) { - incrementAcceptedMetrics(request, false); + incrementAcceptedMetrics(request, false, Optional.empty()); return Optional.empty(); } Result result = checkAccessAllowed(request, resourceMapping.get()); @@ -94,15 +103,15 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { setAttribute(request, RESULT_ATTRIBUTE, resultType.name()); if (resultType == AuthorizationResult.Type.ALLOW) { populateRequestWithResult(request, result); - incrementAcceptedMetrics(request, true); + incrementAcceptedMetrics(request, true, Optional.of(result)); return Optional.empty(); } log.log(Level.FINE, () -> String.format("Forbidden (403) for '%s': %s", request, resultType.name())); - incrementRejectedMetrics(request, FORBIDDEN, resultType.name()); + incrementRejectedMetrics(request, FORBIDDEN, resultType.name(), Optional.of(result)); return Optional.of(new ErrorResponse(FORBIDDEN, "Access forbidden: " + resultType.getDescription())); } catch (IllegalArgumentException e) { log.log(Level.FINE, () -> String.format("Unauthorized (401) for '%s': %s", request, e.getMessage())); - incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized"); + incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized", Optional.empty()); return Optional.of(new ErrorResponse(UNAUTHORIZED, e.getMessage())); } } @@ -130,33 +139,53 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { X509Certificate identityCertificate = getClientCertificate(request).get(); AthenzIdentity peerIdentity = AthenzIdentities.from(identityCertificate); if (allowedProxyIdentities.contains(peerIdentity)) { - return checkAccessWithProxiedAccessToken(resourceAndAction, accessToken, identityCertificate); + return checkAccessWithProxiedAccessToken(request, resourceAndAction, accessToken, identityCertificate); } else { var zpeResult = zpe.checkAccessAllowed( accessToken, identityCertificate, resourceAndAction.resourceName(), resourceAndAction.action()); - return new Result(ACCESS_TOKEN, peerIdentity, zpeResult); + return getResult(ACCESS_TOKEN, peerIdentity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles())); } } - private Result checkAccessWithProxiedAccessToken(ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) { + private Result getResult(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, DiscFilterRequest request, ResourceNameAndAction resourceAndAction, List<String> privileges) { + String currentAction = resourceAndAction.action(); + String futureAction = resourceAndAction.futureAction(); + return new Result(credentialType, identity, zpeResult, privileges, currentAction, futureAction); + } + + private List<String> mapToRequestPrivileges(List<AthenzRole> roles) { + return roles.stream() + .map(this::rolePrivilege) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private String rolePrivilege(AthenzRole role) { + if (readRole.stream().anyMatch(role::equals)) return "read"; + if (writeRole.stream().anyMatch(role::equals)) return "write"; + return null; + } + + private Result checkAccessWithProxiedAccessToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) { AthenzIdentity proxyIdentity = AthenzIdentities.from(identityCertificate); log.log(Level.FINE, () -> String.format("Checking proxied access token. Proxy identity: '%s'. Allowed identities: %s", proxyIdentity, allowedProxyIdentities)); var zpeResult = zpe.checkAccessAllowed(accessToken, resourceAndAction.resourceName(), resourceAndAction.action()); - return new Result(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult); + return getResult(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles())); } private Result checkAccessWithRoleCertificate(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) { X509Certificate roleCertificate = getClientCertificate(request).get(); var zpeResult = zpe.checkAccessAllowed(roleCertificate, resourceAndAction.resourceName(), resourceAndAction.action()); AthenzIdentity identity = AthenzX509CertificateUtils.getIdentityFromRoleCertificate(roleCertificate); - return new Result(ROLE_CERTIFICATE, identity, zpeResult); + AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate).roleName(); + return getResult(ROLE_CERTIFICATE, identity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(List.of(AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate)))); } private Result checkAccessWithRoleToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) { ZToken roleToken = getRoleToken(request); var zpeResult = zpe.checkAccessAllowed(roleToken, resourceAndAction.resourceName(), resourceAndAction.action()); - return new Result(ROLE_TOKEN, roleToken.getIdentity(), zpeResult); + return getResult(ROLE_TOKEN, roleToken.getIdentity(), zpeResult, request, resourceAndAction, mapToRequestPrivileges(roleToken.getRoles())); } private static boolean isAccessTokenPresent(DiscFilterRequest request) { @@ -246,20 +275,30 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { request.setAttribute(name, value); } - private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired) { + private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired, Optional<Result> result) { String hostHeader = request.getHeader("Host"); Metric.Context context = metric.createContext(Map.of( "endpoint", hostHeader != null ? hostHeader : "", - "authz-required", Boolean.toString(authzRequired))); + "authz-required", Boolean.toString(authzRequired), + "httpMethod", HttpRequest.Method.valueOf(request.getMethod()).name(), + "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""), + "currentRequestMapping", result.map(r -> r.currentAction).orElse(""), + "futureRequestMapping", result.map(r -> r.futureAction).orElse("") + )); metric.add(ACCEPTED_METRIC_NAME, 1L, context); } - private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode) { + private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode, Optional<Result> result) { String hostHeader = request.getHeader("Host"); Metric.Context context = metric.createContext(Map.of( "endpoint", hostHeader != null ? hostHeader : "", "status-code", Integer.toString(statusCode), - "zpe-status", zpeCode)); + "zpe-status", zpeCode, + "httpMethod", HttpRequest.Method.valueOf(request.getMethod()), + "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""), + "currentRequestMapping", result.map(r -> r.currentAction).orElse(""), + "futureRequestMapping", result.map(r -> r.futureAction).orElse("") + )); metric.add(REJECTED_METRIC_NAME, 1L, context); } @@ -267,11 +306,17 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase { final EnabledCredentials.Enum credentialType; final AthenzIdentity identity; final AuthorizationResult zpeResult; + final List<String> requestPrivileges; + final String currentAction; + final String futureAction; - Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult) { + public Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, List<String> requestPrivileges, String currentAction, String futureAction) { this.credentialType = credentialType; this.identity = identity; this.zpeResult = zpeResult; + this.requestPrivileges = requestPrivileges; + this.currentAction = currentAction; + this.futureAction = futureAction; } } } diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java index 56c52bd71c4..c962e973959 100644 --- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java +++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java @@ -28,10 +28,15 @@ public interface RequestResourceMapper { class ResourceNameAndAction { private final AthenzResourceName resourceName; private final String action; + private final String futureAction; public ResourceNameAndAction(AthenzResourceName resourceName, String action) { + this(resourceName, action, action); + } + public ResourceNameAndAction(AthenzResourceName resourceName, String action, String futureAction) { this.resourceName = resourceName; this.action = action; + this.futureAction = futureAction; } public AthenzResourceName resourceName() { @@ -42,6 +47,14 @@ public interface RequestResourceMapper { return action; } + public ResourceNameAndAction withFutureAction(String futureAction) { + return new ResourceNameAndAction(resourceName, action, futureAction); + } + + public String futureAction() { + return futureAction; + } + @Override public String toString() { return "ResourceNameAndAction{" + diff --git a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java index bfe02d1f279..137e4653670 100644 --- a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java +++ b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java @@ -296,7 +296,9 @@ public class AthenzAuthorizationFilterTest { .allowedProxyIdentities(allowedProxyIdentities)), new StaticRequestResourceMapper(RESOURCE_NAME, ACTION), zpe, - metric); + metric, + new AthenzRole("domain","reader"), + new AthenzRole("domain", "writer")); } private static void assertAuthorizationResult(DiscFilterRequest request, Type expectedResult) { diff --git a/metrics/src/tests/summetrictest.cpp b/metrics/src/tests/summetrictest.cpp index e3d58659daf..d0380a630f1 100644 --- a/metrics/src/tests/summetrictest.cpp +++ b/metrics/src/tests/summetrictest.cpp @@ -125,4 +125,45 @@ TEST(SumMetricTest, test_start_value) EXPECT_EQ(int64_t(60), sum.getLongValue("value")); } +namespace { + +struct MetricSetWithSum : public MetricSet +{ + LongValueMetric _v1; + LongValueMetric _v2; + SumMetric<LongValueMetric> _sum; + MetricSetWithSum(); + ~MetricSetWithSum() override; +}; + +MetricSetWithSum::MetricSetWithSum() + : MetricSet("MetricSetWithSum", {}, ""), + _v1("v1", {}, "", this), + _v2("v2", {}, "", this), + _sum("sum", {}, "", this) +{ + _sum.addMetricToSum(_v1); + _sum.addMetricToSum(_v2); +} + +MetricSetWithSum::~MetricSetWithSum() = default; + +} + +TEST(SumMetricTest, test_nested_sum) +{ + MetricSetWithSum w1; + MetricSetWithSum w2; + MetricSetWithSum sum; + w1._v1.addValue(10); + w1._v2.addValue(13); + w2._v1.addValue(27); + w2._v2.addValue(29); + w1.addToPart(sum); + w2.addToPart(sum); + EXPECT_EQ(int64_t(37), sum._v1.getLongValue("value")); + EXPECT_EQ(int64_t(42), sum._v2.getLongValue("value")); + EXPECT_EQ(int64_t(79), sum._sum.getLongValue("value")); +} + } diff --git a/metrics/src/vespa/metrics/metric.cpp b/metrics/src/vespa/metrics/metric.cpp index a8d8194b26d..50fc36c62cb 100644 --- a/metrics/src/vespa/metrics/metric.cpp +++ b/metrics/src/vespa/metrics/metric.cpp @@ -232,4 +232,11 @@ Metric::assignValues(const Metric& m) { assert(ownerList.empty()); return this; } + +bool +Metric::is_sum_metric() const +{ + return false; +} + } // metrics diff --git a/metrics/src/vespa/metrics/metric.h b/metrics/src/vespa/metrics/metric.h index 10b74a2da22..c8fb3031278 100644 --- a/metrics/src/vespa/metrics/metric.h +++ b/metrics/src/vespa/metrics/metric.h @@ -247,6 +247,8 @@ public: virtual bool isMetricSet() const { return false; } + virtual bool is_sum_metric() const; + private: /** diff --git a/metrics/src/vespa/metrics/summetric.h b/metrics/src/vespa/metrics/summetric.h index f04c1696638..7b60c968e5b 100644 --- a/metrics/src/vespa/metrics/summetric.h +++ b/metrics/src/vespa/metrics/summetric.h @@ -69,6 +69,7 @@ public: void printDebug(std::ostream&, const std::string& indent="") const override; void addToPart(Metric&) const override; void addToSnapshot(Metric&, std::vector<Metric::UP> &) const override; + bool is_sum_metric() const override; private: friend struct MetricManagerTest; diff --git a/metrics/src/vespa/metrics/summetric.hpp b/metrics/src/vespa/metrics/summetric.hpp index 9520456a974..e067b9643c2 100644 --- a/metrics/src/vespa/metrics/summetric.hpp +++ b/metrics/src/vespa/metrics/summetric.hpp @@ -142,8 +142,17 @@ template<typename AddendMetric> void SumMetric<AddendMetric>::addToPart(Metric& m) const { - std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum()); - sum.second->addToPart(m); + if (!m.is_sum_metric()) { + std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum()); + sum.second->addToPart(m); + } +} + +template<typename AddendMetric> +bool +SumMetric<AddendMetric>::is_sum_metric() const +{ + return true; } template<typename AddendMetric> diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java index 47337518a65..39183688340 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java @@ -20,19 +20,22 @@ import java.util.Set; import java.util.stream.Collectors; /** - * The application maintainer detects manual operator changes to nodes and redeploys affected applications. - * The purpose of this is to redeploy affected applications faster than achieved by the regular application - * maintenance to reduce the time period where the node repository and the application model is out of sync. + * This maintainer detects changes to nodes that must be expedited, and redeploys affected applications. + * + * The purpose of this is to redeploy affected applications faster than achieved by + * {@link PeriodicApplicationMaintainer}, to reduce the time period where the node repository and the application model + * is out of sync. * * Why can't the manual change directly make the application redeployment? - * Because the redeployment must run at the right config server, while the node state change may be running - * at any config server. + * + * Because we want to queue redeployments to avoid overloading config servers. * * @author bratseth + * @author mpolden */ -public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { +public class ExpeditedChangeApplicationMaintainer extends ApplicationMaintainer { - OperatorChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) { + ExpeditedChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) { super(deployer, metric, nodeRepository, interval); } @@ -57,7 +60,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { boolean deployed = deployWithLock(application); if (deployed) log.info("Redeployed application " + application.toShortString() + - " as a manual change was made to its nodes"); + " as an expedited change was made to its nodes"); } private boolean hasNodesWithChanges(ApplicationId applicationId, NodeList nodes) { @@ -66,7 +69,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { return nodes.stream() .flatMap(node -> node.history().events().stream()) - .filter(event -> event.agent() == Agent.operator) + .filter(event -> expediteChangeBy(event.agent())) .map(History.Event::at) .anyMatch(e -> lastDeployTime.get().isBefore(e)); } @@ -84,5 +87,14 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer { .groupingBy(node -> node.allocation().get().owner()); } + /** Returns whether to expedite changes performed by agent */ + private boolean expediteChangeBy(Agent agent) { + switch (agent) { + case operator: + case RebuildingOsUpgrader: + case HostEncrypter: return true; + } + return false; + } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java index 4537d99d107..9f84322fe0f 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java @@ -200,8 +200,10 @@ public class MetricsReporter extends NodeRepositoryMaintainer { metric.set("wantToDeprovision", node.status().wantToDeprovision() ? 1 : 0, context); metric.set("failReport", NodeFailer.reasonsToFailParentHost(node).isEmpty() ? 0 : 1, context); - metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context); - metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context); + if (node.type().isHost()) { + metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context); + metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context); + } HostName hostname = new HostName(node.hostname()); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java index 40f9d17519c..cdb5202603a 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java @@ -48,7 +48,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { maintainers.add(new NodeFailer(deployer, nodeRepository, defaults.failGrace, defaults.nodeFailerInterval, orchestrator, defaults.throttlePolicy, metric)); maintainers.add(new NodeHealthTracker(hostLivenessTracker, serviceMonitor, nodeRepository, defaults.nodeFailureStatusUpdateInterval, metric)); - maintainers.add(new OperatorChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.operatorChangeRedeployInterval)); + maintainers.add(new ExpeditedChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.expeditedChangeRedeployInterval)); maintainers.add(new ReservationExpirer(nodeRepository, defaults.reservationExpiry, metric)); maintainers.add(new RetiredExpirer(nodeRepository, orchestrator, deployer, metric, defaults.retiredInterval, defaults.retiredExpiry)); maintainers.add(new InactiveExpirer(nodeRepository, defaults.inactiveExpiry, Map.of(NodeType.config, defaults.inactiveConfigServerExpiry, @@ -92,7 +92,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { /** Time between each run of maintainer that does periodic redeployment */ private final Duration redeployMaintainerInterval; /** Applications are redeployed after manual operator changes within this time period */ - private final Duration operatorChangeRedeployInterval; + private final Duration expeditedChangeRedeployInterval; /** The time a node must be continuously unresponsive before it is failed */ private final Duration failGrace; @@ -135,7 +135,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { nodeFailerInterval = Duration.ofMinutes(15); nodeFailureStatusUpdateInterval = Duration.ofMinutes(2); nodeMetricsCollectionInterval = Duration.ofMinutes(1); - operatorChangeRedeployInterval = Duration.ofMinutes(3); + expeditedChangeRedeployInterval = Duration.ofMinutes(3); // Vespa upgrade frequency is higher in CD so (de)activate OS upgrades more frequently as well osUpgradeActivatorInterval = zone.system().isCd() ? Duration.ofSeconds(30) : Duration.ofMinutes(5); periodicRedeployInterval = Duration.ofMinutes(60); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java index db6aebacddc..62d09c99f16 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java @@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals; /** * @author bratseth */ -public class OperatorChangeApplicationMaintainerTest { +public class ExpeditedChangeApplicationMaintainerTest { @Test public void test_application_maintenance() { @@ -42,10 +42,10 @@ public class OperatorChangeApplicationMaintainerTest { // Create applications fixture.activate(); assertEquals("Initial applications are deployed", 3, fixture.deployer.redeployments); - OperatorChangeApplicationMaintainer maintainer = new OperatorChangeApplicationMaintainer(fixture.deployer, - new TestMetric(), - nodeRepository, - Duration.ofMinutes(1)); + ExpeditedChangeApplicationMaintainer maintainer = new ExpeditedChangeApplicationMaintainer(fixture.deployer, + new TestMetric(), + nodeRepository, + Duration.ofMinutes(1)); clock.advance(Duration.ofMinutes(2)); maintainer.maintain(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java index 4251d1276f8..7fa6810c1ba 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java @@ -43,6 +43,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -113,8 +114,6 @@ public class MetricsReporterTest { expectedMetrics.put("wantToDeprovision", 0); expectedMetrics.put("failReport", 0); - expectedMetrics.put("wantToEncrypt", 0); - expectedMetrics.put("diskEncrypted", 0); expectedMetrics.put("allowedToBeDown", 1); expectedMetrics.put("suspended", 1); @@ -150,6 +149,27 @@ public class MetricsReporterTest { assertEquals(expectedMetrics, new TreeMap<>(metric.values)); } + @Test + public void test_registered_metrics_for_host() { + NodeFlavors nodeFlavors = FlavorConfigBuilder.createDummies("default"); + Orchestrator orchestrator = mock(Orchestrator.class); + when(orchestrator.getHostInfo(eq(reference), any())).thenReturn( + HostInfo.createSuspended(HostStatus.ALLOWED_TO_BE_DOWN, Instant.ofEpochSecond(1))); + ProvisioningTester tester = new ProvisioningTester.Builder().flavors(nodeFlavors.getFlavors()).orchestrator(orchestrator).build(); + tester.makeProvisionedNodes(1, "default", NodeType.host, 0); + + tester.clock().setInstant(Instant.ofEpochSecond(124)); + + TestMetric metric = new TestMetric(); + MetricsReporter metricsReporter = metricsReporter(metric, tester); + metricsReporter.maintain(); + + // Only verify metrics that are set for hosts + TreeMap<String, Number> metrics = new TreeMap<>(metric.values); + assertTrue(metrics.containsKey("wantToEncrypt")); + assertTrue(metrics.containsKey("diskEncrypted")); + } + private void verifyAndRemoveIntegerMetricSum(TestMetric metric, String key, int expected) { assertEquals(expected, (int) metric.sumNumberValues(key)); metric.remove(key); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json index 73ac692e37b..26d711945c6 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json @@ -7,6 +7,9 @@ "name": "DirtyExpirer" }, { + "name": "ExpeditedChangeApplicationMaintainer" + }, + { "name": "FailedExpirer" }, { @@ -37,9 +40,6 @@ "name": "NodeRebooter" }, { - "name": "OperatorChangeApplicationMaintainer" - }, - { "name": "OsUpgradeActivator" }, { diff --git a/searchlib/src/tests/predicate/predicate_index_test.cpp b/searchlib/src/tests/predicate/predicate_index_test.cpp index facf0054c4a..19ad0301b5c 100644 --- a/searchlib/src/tests/predicate/predicate_index_test.cpp +++ b/searchlib/src/tests/predicate/predicate_index_test.cpp @@ -113,7 +113,6 @@ TEST("require that PredicateIndex can index document") { EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid()); indexFeature(index, doc_id, min_feature, {{hash, interval}}, {}); index.commit(); - auto posting_it = lookupPosting(index, hash); EXPECT_EQUAL(doc_id, posting_it.getKey()); uint32_t size; @@ -123,6 +122,25 @@ TEST("require that PredicateIndex can index document") { EXPECT_EQUAL(interval, interval_list[0]); } +TEST("require that bit vector cache is initialized correctly") { + BitVectorCache::KeyAndCountSet keySet; + keySet.emplace_back(hash, dummy_provider.getDocIdLimit()/2); + PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10); + EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid()); + indexFeature(index, doc_id, min_feature, {{hash, interval}}, {}); + index.requireCachePopulation(); + index.populateIfNeeded(dummy_provider.getDocIdLimit()); + EXPECT_TRUE(index.lookupCachedSet(keySet).empty()); + index.commit(); + EXPECT_TRUE(index.getIntervalIndex().lookup(hash).valid()); + EXPECT_TRUE(index.lookupCachedSet(keySet).empty()); + + index.requireCachePopulation(); + index.populateIfNeeded(dummy_provider.getDocIdLimit()); + EXPECT_FALSE(index.lookupCachedSet(keySet).empty()); +} + + TEST("require that PredicateIndex can index document with bounds") { PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10); EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid()); diff --git a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp index 3dd2ec26dea..5b8d5f5b9ce 100644 --- a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp +++ b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp @@ -86,8 +86,7 @@ TEST_F("require that blueprint with empty index estimates empty.", Fixture) { EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint with zero-constraint doc estimates non-empty.", - Fixture) { +TEST_F("require that blueprint with zero-constraint doc estimates non-empty.", Fixture) { f.indexEmptyDocument(42); PredicateBlueprint blueprint(f.field, f.guard(), f.query); EXPECT_FALSE(blueprint.getState().estimate().empty); @@ -98,11 +97,9 @@ const int min_feature = 1; const uint32_t doc_id = 2; const uint32_t interval = 0x0001ffff; -TEST_F("require that blueprint with posting list entry estimates non-empty.", - Fixture) { +TEST_F("require that blueprint with posting list entry estimates non-empty.", Fixture) { PredicateTreeAnnotations annotations(min_feature); - annotations.interval_map[PredicateHash::hash64("key=value")] = - std::vector<Interval>{{interval}}; + annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}}; f.indexDocument(doc_id, annotations); PredicateBlueprint blueprint(f.field, f.guard(), f.query); @@ -110,8 +107,7 @@ TEST_F("require that blueprint with posting list entry estimates non-empty.", EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint with 'bounds' posting list entry estimates " - "non-empty.", Fixture) { +TEST_F("require that blueprint with 'bounds' posting list entry estimates non-empty.", Fixture) { PredicateTreeAnnotations annotations(min_feature); annotations.bounds_map[PredicateHash::hash64("range_key=40")] = std::vector<IntervalWithBounds>{{interval, 0x80000003}}; @@ -122,34 +118,50 @@ TEST_F("require that blueprint with 'bounds' posting list entry estimates " EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint with zstar-compressed estimates non-empty.", - Fixture) { +TEST_F("require that blueprint with zstar-compressed estimates non-empty.", Fixture) { PredicateTreeAnnotations annotations(1); - annotations.interval_map[Constants::z_star_compressed_hash] =std::vector<Interval>{{0xfffe0000}}; + annotations.interval_map[Constants::z_star_compressed_hash] = std::vector<Interval>{{0xfffe0000}}; f.indexDocument(doc_id, annotations); PredicateBlueprint blueprint(f.field, f.guard(), f.query); EXPECT_FALSE(blueprint.getState().estimate().empty); EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits); } -TEST_F("require that blueprint can create search", Fixture) { - PredicateTreeAnnotations annotations(1); - annotations.interval_map[PredicateHash::hash64("key=value")] =std::vector<Interval>{{interval}}; - f.indexDocument(doc_id, annotations); - +void +runQuery(Fixture & f, std::vector<uint32_t> expected, bool expectCachedSize, uint32_t expectedKV) { PredicateBlueprint blueprint(f.field, f.guard(), f.query); blueprint.fetchPostings(ExecuteInfo::TRUE); + EXPECT_EQUAL(expectCachedSize, blueprint.getCachedFeatures().size()); + for (uint32_t docId : expected) { + EXPECT_EQUAL(expectedKV, uint32_t(blueprint.getKV()[docId])); + } TermFieldMatchDataArray tfmda; SearchIterator::UP it = blueprint.createLeafSearch(tfmda, true); ASSERT_TRUE(it.get()); it->initFullRange(); EXPECT_EQUAL(SearchIterator::beginId(), it->getDocId()); - EXPECT_FALSE(it->seek(doc_id - 1)); - EXPECT_EQUAL(doc_id, it->getDocId()); - EXPECT_TRUE(it->seek(doc_id)); - EXPECT_EQUAL(doc_id, it->getDocId()); - EXPECT_FALSE(it->seek(doc_id + 1)); - EXPECT_TRUE(it->isAtEnd()); + std::vector<uint32_t> actual; + for (it->seek(1); ! it->isAtEnd(); it->seek(it->getDocId()+1)) { + actual.push_back(it->getDocId()); + } + EXPECT_EQUAL(expected.size(), actual.size()); + for (size_t i(0); i < expected.size(); i++) { + EXPECT_EQUAL(expected[i], actual[i]); + } +} + +TEST_F("require that blueprint can create search", Fixture) { + PredicateTreeAnnotations annotations(1); + annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}}; + for (size_t i(0); i < 9; i++) { + f.indexDocument(doc_id + i, annotations); + } + runQuery(f, {2,3,4,5,6,7,8,9,10}, 0, 1); + f.indexDocument(doc_id+9, annotations); + runQuery(f, {2, 3,4,5,6,7,8,9,10,11}, 0, 1); + f.index().requireCachePopulation(); + f.indexDocument(doc_id+10, annotations); + runQuery(f, {2,3,4,5,6,7,8,9,10,11,12}, 1, 1); } TEST_F("require that blueprint can create more advanced search", Fixture) { diff --git a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp index c4a0e036a01..555117126a9 100644 --- a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp @@ -100,8 +100,8 @@ PredicateAttribute::getValueCount(DocId) const void PredicateAttribute::onCommit() { - populateIfNeeded(); _index->commit(); + populateIfNeeded(); incGeneration(); } diff --git a/searchlib/src/vespa/searchlib/common/bitvectorcache.h b/searchlib/src/vespa/searchlib/common/bitvectorcache.h index f81fd0163d8..a642d66f42f 100644 --- a/searchlib/src/vespa/searchlib/common/bitvectorcache.h +++ b/searchlib/src/vespa/searchlib/common/bitvectorcache.h @@ -41,6 +41,7 @@ public: void adjustDocIdLimit(uint32_t docId); void populate(uint32_t count, const PopulateInterface &); bool needPopulation() const { return _needPopulation; } + void requirePopulation() { _needPopulation = true; } private: class KeyMeta { public: diff --git a/searchlib/src/vespa/searchlib/predicate/predicate_index.h b/searchlib/src/vespa/searchlib/predicate/predicate_index.h index f4c89a2b369..49bf77f2fcc 100644 --- a/searchlib/src/vespa/searchlib/predicate/predicate_index.h +++ b/searchlib/src/vespa/searchlib/predicate/predicate_index.h @@ -54,8 +54,6 @@ private: template <typename IntervalT> void indexDocumentFeatures(uint32_t doc_id, const FeatureMap<IntervalT> &interval_map); - PopulateInterface::Iterator::UP lookup(uint64_t key) const override; - public: PredicateIndex(GenerationHolder &genHolder, const DocIdLimitProvider &limit_provider, @@ -105,6 +103,9 @@ public: * Adjust size of structures to have space for docId. */ void adjustDocIdLimit(uint32_t docId); + PopulateInterface::Iterator::UP lookup(uint64_t key) const override; + // Exposed for testing + void requireCachePopulation() const { _cache.requirePopulation(); } }; extern template class SimpleIndex<vespalib::datastore::EntryRef>; diff --git a/searchlib/src/vespa/searchlib/predicate/simple_index.h b/searchlib/src/vespa/searchlib/predicate/simple_index.h index 1398bb0817c..75dc540f787 100644 --- a/searchlib/src/vespa/searchlib/predicate/simple_index.h +++ b/searchlib/src/vespa/searchlib/predicate/simple_index.h @@ -168,8 +168,6 @@ private: } public: - SimpleIndex(GenerationHolder &generation_holder, const DocIdLimitProvider &provider) : - SimpleIndex(generation_holder, provider, SimpleIndexConfig()) {} SimpleIndex(GenerationHolder &generation_holder, const DocIdLimitProvider &provider, const SimpleIndexConfig &config) : _generation_holder(generation_holder), _config(config), _limit_provider(provider) {} diff --git a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h index 9609cd4f6c9..ef225e86c50 100644 --- a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h +++ b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h @@ -50,8 +50,11 @@ public: void fetchPostings(const ExecuteInfo &execInfo) override; SearchIterator::UP - createLeafSearch(const fef::TermFieldMatchDataArray &tfmda, - bool strict) const override; + createLeafSearch(const fef::TermFieldMatchDataArray &tfmda, bool strict) const override; + + // Exposed for testing + const BitVectorCache::CountVector & getKV() const { return _kV; } + const BitVectorCache::KeySet & getCachedFeatures() const { return _cachedFeatures; } private: using BTreeIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::BTreeIterator; using VectorIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::VectorIterator; @@ -70,24 +73,24 @@ private: void addZeroConstraintToK(); std::vector<predicate::PredicatePostingList::UP> createPostingLists() const; - const PredicateAttribute & _attribute; + const PredicateAttribute & _attribute; const predicate::PredicateIndex &_index; - Alloc _kVBacking; - BitVectorCache::CountVector _kV; - BitVectorCache::KeySet _cachedFeatures; + Alloc _kVBacking; + BitVectorCache::CountVector _kV; + BitVectorCache::KeySet _cachedFeatures; - std::vector<IntervalEntry> _interval_dict_entries; - std::vector<BoundsEntry> _bounds_dict_entries; - vespalib::datastore::EntryRef _zstar_dict_entry; + std::vector<IntervalEntry> _interval_dict_entries; + std::vector<BoundsEntry> _bounds_dict_entries; + vespalib::datastore::EntryRef _zstar_dict_entry; - std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators; + std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators; std::vector<IntervalIteratorEntry<VectorIterator>> _interval_vector_iterators; - std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators; - std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators; + std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators; + std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators; // The zstar iterator is either a vector or a btree iterator. - optional<BTreeIterator> _zstar_btree_iterator; + optional<BTreeIterator> _zstar_btree_iterator; optional<VectorIterator> _zstar_vector_iterator; - bool _fetch_postings_done; + bool _fetch_postings_done; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 4f6e2d5016b..c94c1a415b0 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -748,7 +748,7 @@ void DistributorStripe::send_updated_host_info_if_required() { if (_use_legacy_mode) { _component.getStateUpdater().immediately_send_get_node_state_replies(); } else { - _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index! + _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index); } _must_send_updated_host_info = false; } diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java index c085be7c205..561b20a9c8a 100644 --- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java +++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java @@ -6,7 +6,10 @@ import com.auth0.jwt.interfaces.DecodedJWT; import com.yahoo.vespa.athenz.utils.AthenzIdentities; import java.time.Instant; +import java.util.List; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; /** * Represents an Athenz Access Token @@ -18,6 +21,8 @@ public class AthenzAccessToken { public static final String HTTP_HEADER_NAME = "Authorization"; private static final String BEARER_TOKEN_PREFIX = "Bearer "; + private static final String SCOPE_CLAIM = "scp"; + private static final String AUDIENCE_CLAIM = "aud"; private final String value; private volatile DecodedJWT jwt; @@ -43,6 +48,12 @@ public class AthenzAccessToken { return jwt().getExpiresAt().toInstant(); } public AthenzIdentity getAthenzIdentity() { return AthenzIdentities.from(jwt().getClaim("client_id").asString()); } + public List<AthenzRole> roles() { + String domain = Optional.ofNullable(jwt().getClaim(AUDIENCE_CLAIM).asString()).orElse(""); + return Optional.ofNullable(jwt().getClaim(SCOPE_CLAIM).asList(String.class)).orElse(List.of()).stream() + .map(role -> new AthenzRole(domain, role)) + .collect(Collectors.toList()); + } private DecodedJWT jwt() { if (jwt == null) { diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index 5aff8dfc25d..83abe0bb872 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -84,7 +84,7 @@ public class CliClient { builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE); } cliArgs.certificateAndKey().ifPresent(c -> builder.setCertificate(c.certificateFile, c.privateKeyFile)); - cliArgs.caCertificates().ifPresent(builder::setCaCertificates); + cliArgs.caCertificates().ifPresent(builder::setCaCertificatesFile); cliArgs.headers().forEach(builder::addRequestHeader); return builder.build(); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index 672f5f080b5..163da14cead 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -127,7 +127,7 @@ class ApacheCluster implements Cluster { .setInitialWindowSize(Integer.MAX_VALUE) .build()); - SSLContext sslContext = constructSslContext(builder); + SSLContext sslContext = builder.constructSslContext(); String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); if (allowedCiphers.length == 0) throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); @@ -142,19 +142,6 @@ class ApacheCluster implements Cluster { .build(); } - private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { - if (builder.sslContext != null) return builder.sslContext; - SslContextBuilder sslContextBuilder = new SslContextBuilder(); - if (builder.certificate != null && builder.privateKey != null) { - sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); - } - if (builder.caCertificates != null) { - sslContextBuilder.withCaCertificates(builder.caCertificates); - } - return sslContextBuilder.build(); - } - - private static class ApacheHttpResponse implements HttpResponse { private final SimpleHttpResponse wrapped; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index d0a221ed358..df1f3bcd54c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -7,8 +7,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Path; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -35,9 +38,12 @@ public class FeedClientBuilder { int maxStreamsPerConnection = 128; FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy; FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10)); - Path certificate; - Path privateKey; - Path caCertificates; + Path certificateFile; + Path privateKeyFile; + Path caCertificatesFile; + Collection<X509Certificate> certificate; + PrivateKey privateKey; + Collection<X509Certificate> caCertificates; public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } @@ -81,9 +87,6 @@ public class FeedClientBuilder { } public FeedClientBuilder setSslContext(SSLContext context) { - if (certificate != null || caCertificates != null || privateKey != null) { - throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates"); - } this.sslContext = requireNonNull(context); return this; } @@ -113,24 +116,77 @@ public class FeedClientBuilder { } public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) { - if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate"); - this.certificate = certificatePemFile; - this.privateKey = privateKeyPemFile; + this.certificateFile = certificatePemFile; + this.privateKeyFile = privateKeyPemFile; + return this; + } + + public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { + this.certificate = certificate; + this.privateKey = privateKey; + return this; + } + + public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) { + return setCertificate(Collections.singletonList(certificate), privateKey); + } + + public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) { + this.caCertificatesFile = caCertificatesFile; return this; } - public FeedClientBuilder setCaCertificates(Path caCertificatesFile) { - if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and CA certificate"); - this.caCertificates = caCertificatesFile; + public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) { + this.caCertificates = caCertificates; return this; } public FeedClient build() { try { + validateConfiguration(); return new HttpFeedClient(this); } catch (IOException e) { throw new UncheckedIOException(e); } } + SSLContext constructSslContext() throws IOException { + if (sslContext != null) return sslContext; + SslContextBuilder sslContextBuilder = new SslContextBuilder(); + if (certificateFile != null && privateKeyFile != null) { + sslContextBuilder.withCertificateAndKey(certificateFile, privateKeyFile); + } else if (certificate != null && privateKey != null) { + sslContextBuilder.withCertificateAndKey(certificate, privateKey); + } + if (caCertificatesFile != null) { + sslContextBuilder.withCaCertificates(caCertificatesFile); + } else if (caCertificates != null) { + sslContextBuilder.withCaCertificates(caCertificates); + } + return sslContextBuilder.build(); + } + + private void validateConfiguration() { + if (sslContext != null && ( + certificateFile != null || caCertificatesFile != null || privateKeyFile != null || + certificate != null || caCertificates != null || privateKey != null)) { + throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates"); + } + if (certificate != null && certificateFile != null) { + throw new IllegalArgumentException("Cannot set both certificate directly and as file"); + } + if (privateKey != null && privateKeyFile != null) { + throw new IllegalArgumentException("Cannot set both private key directly and as file"); + } + if (caCertificates != null && caCertificatesFile != null) { + throw new IllegalArgumentException("Cannot set both CA certificates directly and as file"); + } + if (certificate != null && certificate.isEmpty()) { + throw new IllegalArgumentException("Certificate cannot be empty"); + } + if (caCertificates != null && caCertificates.isEmpty()) { + throw new IllegalArgumentException("CA certificates cannot be empty"); + } + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index de32e7abdf5..b3a7aca1808 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -74,6 +74,31 @@ public class JsonFeeder implements Closeable { public static Builder builder(FeedClient client) { return new Builder(client); } + /** Feeds single JSON feed operations on the form + * <pre> + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * } + * </pre> + */ + public CompletableFuture<Result> feedSingle(String json) { + CompletableFuture<Result> result = new CompletableFuture<>(); + try { + SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); + parser.next().whenCompleteAsync((operationResult, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(operationResult); + } + }, resultExecutor); + } catch (Exception e) { + resultExecutor.execute(() -> result.completeExceptionally(e)); + } + return result; + } + /** Feeds a stream containing a JSON array of feed operations on the form * <pre> * [ @@ -288,87 +313,111 @@ public class JsonFeeder implements Closeable { } } + private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { + + private final byte[] json; + + SingleOperationParserAndExecutor(byte[] json) throws IOException { + super(factory.createParser(json), false); + this.json = json; + } + + @Override + String getDocumentJson(long start, long end) { + return new String(json, (int) start, (int) (end - start), UTF_8); + } + } + private abstract class OperationParserAndExecutor { private final JsonParser parser; private final boolean multipleOperations; + private boolean arrayPrefixParsed; protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { this.parser = parser; this.multipleOperations = multipleOperations; - if (multipleOperations) expect(START_ARRAY); } abstract String getDocumentJson(long start, long end); CompletableFuture<Result> next() throws IOException { - JsonToken token = parser.nextToken(); - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && !multipleOperations) return null; - else if (token == START_OBJECT); - else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; + try { + if (multipleOperations && !arrayPrefixParsed){ + expect(START_ARRAY); + arrayPrefixParsed = true; + } + + JsonToken token = parser.nextToken(); + if (token == END_ARRAY && multipleOperations) return null; + else if (token == null && !multipleOperations) return null; + else if (token == START_OBJECT); + else throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; } - end = parser.getTokenLocation().getByteOffset() + 1; - break; + default: throw new JsonParseException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); } - default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - break; + break; - case END_OBJECT: - break loop; + case END_OBJECT: + break loop; - default: - throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + default: + throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } } - } - if (id == null) - throw new IllegalArgumentException("No document id for document at offset " + start); - - if (end < start) - throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); + if (id == null) + throw new JsonParseException("No document id for document at offset " + start); + + if (end < start) + throw new JsonParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + String payload = getDocumentJson(start, end); + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new JsonParseException("Unexpected operation type '" + type + "'"); + } + } catch (com.fasterxml.jackson.core.JacksonException e) { + throw new JsonParseException("Failed to parse JSON", e); } } - void expect(JsonToken token) throws IOException { + private void expect(JsonToken token) throws IOException { if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new JsonParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); } private String readString() throws IOException { String value = parser.nextTextValue(); if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new JsonParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -377,7 +426,7 @@ public class JsonFeeder implements Closeable { private boolean readBoolean() throws IOException { Boolean value = parser.nextBooleanValue(); if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new JsonParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java new file mode 100644 index 00000000000..785ffd8eb4c --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +/** + * @author bjorncs + */ +public class JsonParseException extends FeedException { + + public JsonParseException(String message) { super(message); } + + public JsonParseException(String message, Throwable cause) { super(message, cause); } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java index 7200d5fd943..9114e22f4a6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java @@ -20,11 +20,14 @@ import java.nio.file.Path; import java.security.GeneralSecurityException; import java.security.KeyFactory; import java.security.KeyStore; +import java.security.KeyStoreException; import java.security.PrivateKey; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.security.spec.PKCS8EncodedKeySpec; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -39,6 +42,9 @@ class SslContextBuilder { private Path certificateFile; private Path privateKeyFile; private Path caCertificatesFile; + private Collection<X509Certificate> certificate; + private PrivateKey privateKey; + private Collection<X509Certificate> caCertificates; SslContextBuilder withCertificateAndKey(Path certificate, Path privateKey) { this.certificateFile = certificate; @@ -46,20 +52,35 @@ class SslContextBuilder { return this; } + SslContextBuilder withCertificateAndKey(Collection<X509Certificate> certificate, PrivateKey privateKey) { + this.certificate = certificate; + this.privateKey = privateKey; + return this; + } + SslContextBuilder withCaCertificates(Path caCertificates) { this.caCertificatesFile = caCertificates; return this; } + SslContextBuilder withCaCertificates(Collection<X509Certificate> caCertificates) { + this.caCertificates = caCertificates; + return this; + } + SSLContext build() throws IOException { try { KeyStore keystore = KeyStore.getInstance("PKCS12"); keystore.load(null); if (certificateFile != null && privateKeyFile != null) { keystore.setKeyEntry("cert", privateKey(privateKeyFile), new char[0], certificates(certificateFile)); + } else if (certificate != null && privateKey != null) { + keystore.setKeyEntry("cert", privateKey, new char[0], certificate.toArray(new Certificate[0])); } if (caCertificatesFile != null) { - keystore.setCertificateEntry("ca-cert", certificates(caCertificatesFile)[0]); + addCaCertificates(keystore, Arrays.asList(certificates(caCertificatesFile))); + } else if (caCertificates != null) { + addCaCertificates(keystore, caCertificates); } KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(keystore, new char[0]); @@ -73,6 +94,13 @@ class SslContextBuilder { } } + private static void addCaCertificates(KeyStore keystore, Collection<? extends Certificate> certificates) throws KeyStoreException { + int i = 0; + for (Certificate cert : certificates) { + keystore.setCertificateEntry("ca-cert-" + ++i, cert); + } + } + private static Certificate[] certificates(Path file) throws IOException, GeneralSecurityException { try (PEMParser parser = new PEMParser(Files.newBufferedReader(file))) { List<X509Certificate> result = new ArrayList<>(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index e2ae5bc7155..03194e23d47 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -11,6 +11,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -51,52 +52,72 @@ class JsonFeederTest { try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) { AtomicInteger resultsReceived = new AtomicInteger(); AtomicBoolean completedSuccessfully = new AtomicBoolean(); - Set<String> ids = new HashSet<>(); long startNanos = System.nanoTime(); - JsonFeeder.builder(new FeedClient() { + SimpleClient feedClient = new SimpleClient(); + JsonFeeder.builder(feedClient).build() + .feedMany(in, 1 << 7, + new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document + @Override + public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } - @Override - public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { - ids.add(documentId.userSpecific()); - return createSuccessResult(documentId); - } + @Override + public void onError(Throwable error) { exceptionThrow.set(error); } - @Override - public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - return createSuccessResult(documentId); - } + @Override + public void onComplete() { completedSuccessfully.set(true); } + }) + .join(); - @Override - public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - return createSuccessResult(documentId); - } + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, feedClient.ids.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); + } + } - @Override - public OperationStats stats() { return null; } + @Test + public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + try (JsonFeeder feeder = JsonFeeder.builder(new SimpleClient()).build()) { + String json = "{\"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n"; + Result result = feeder.feedSingle(json).get(); + assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId()); + assertEquals(Result.Type.success, result.type()); + assertEquals("success", result.resultMessage().get()); + } + } - @Override - public void close(boolean graceful) { } + private static class SimpleClient implements FeedClient { + final Set<String> ids = new HashSet<>(); - private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); - } + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + ids.add(documentId.userSpecific()); + return createSuccessResult(documentId); + } - }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document - @Override - public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } + @Override + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return createSuccessResult(documentId); + } - @Override - public void onError(Throwable error) { exceptionThrow.set(error); } + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return createSuccessResult(documentId); + } - @Override - public void onComplete() { completedSuccessfully.set(true); } - }).join(); + @Override + public OperationStats stats() { return null; } - System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); - assertEquals(docs + 1, ids.size()); - assertEquals(docs + 1, resultsReceived.get()); - assertTrue(completedSuccessfully.get()); - assertNull(exceptionThrow.get()); + @Override + public void close(boolean graceful) { } + + private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); } } diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml index 382c28dc884..39f10d84f9b 100644 --- a/vespa-hadoop/pom.xml +++ b/vespa-hadoop/pom.xml @@ -101,17 +101,22 @@ <scope>test</scope> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> - <!-- Vespa feeding dependencies --> <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-http-client</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>vespa-feed-client</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> </dependencies> diff --git a/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java new file mode 100644 index 00000000000..5974a8df271 --- /dev/null +++ b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.Result.Type; + +/** + * Workaround for package-private {@link Result} constructor. + * + * @author bjorncs + */ +public class DryrunResult { + + private DryrunResult() {} + + public static Result create(Type type, DocumentId documentId, String resultMessage, String traceMessage) { + return new Result(type, documentId, resultMessage, traceMessage); + } +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java new file mode 100644 index 00000000000..b716c55beb5 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java @@ -0,0 +1,235 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hadoop.mapreduce; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedClientFactory; +import com.yahoo.vespa.http.client.Result; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; +import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; +import com.yahoo.vespa.http.client.config.SessionParams; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import javax.xml.namespace.QName; +import javax.xml.stream.FactoryConfigurationError; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; +import java.io.IOException; +import java.io.StringReader; +import java.time.Duration; +import java.util.List; +import java.util.StringTokenizer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Logger; + +/** + * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class LegacyVespaRecordWriter extends RecordWriter { + + private final static Logger log = Logger.getLogger(LegacyVespaRecordWriter.class.getCanonicalName()); + + private boolean initialized = false; + private FeedClient feedClient; + private final VespaCounters counters; + private final int progressInterval; + + final VespaConfiguration configuration; + + LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { + this.counters = counters; + this.configuration = configuration; + this.progressInterval = configuration.progressInterval(); + } + + + @Override + public void write(Object key, Object data) throws IOException, InterruptedException { + if (!initialized) { + initialize(); + } + + String doc = data.toString().trim(); + + // Parse data to find document id - if none found, skip this write + String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) + : findDocIdFromXml(doc); + if (docId != null && docId.length() >= 0) { + feedClient.stream(docId, doc); + counters.incrementDocumentsSent(1); + } else { + counters.incrementDocumentsSkipped(1); + } + + if (counters.getDocumentsSent() % progressInterval == 0) { + String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", + counters.getDocumentsSent(), + counters.getDocumentsOk(), + counters.getDocumentsFailed(), + counters.getDocumentsSkipped()); + log.info(progress); + } + + } + + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + if (feedClient != null) { + feedClient.close(); + } + } + + protected ConnectionParams.Builder configureConnectionParams() { + ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); + connParamsBuilder.setDryRun(configuration.dryrun()); + connParamsBuilder.setUseCompression(configuration.useCompression()); + connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); + connParamsBuilder.setMaxRetries(configuration.numRetries()); + if (configuration.proxyHost() != null) { + connParamsBuilder.setProxyHost(configuration.proxyHost()); + } + if (configuration.proxyPort() >= 0) { + connParamsBuilder.setProxyPort(configuration.proxyPort()); + } + return connParamsBuilder; + } + + protected FeedParams.Builder configureFeedParams() { + FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); + feedParamsBuilder.setDataFormat(configuration.dataFormat()); + feedParamsBuilder.setRoute(configuration.route()); + feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); + feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); + feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); + return feedParamsBuilder; + } + + protected SessionParams.Builder configureSessionParams() { + SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); + sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); + sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); + return sessionParamsBuilder; + } + + private void initialize() { + if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { + int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); + log.info("VespaStorage: Delaying startup by " + delay + " ms"); + try { + Thread.sleep(delay); + } catch (Exception e) {} + } + + ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); + FeedParams.Builder feedParamsBuilder = configureFeedParams(); + SessionParams.Builder sessionParams = configureSessionParams(); + + sessionParams.setConnectionParams(connParamsBuilder.build()); + sessionParams.setFeedParams(feedParamsBuilder.build()); + + String endpoints = configuration.endpoint(); + StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); + while (tokenizer.hasMoreTokens()) { + String endpoint = tokenizer.nextToken().trim(); + sessionParams.addCluster(new Cluster.Builder().addEndpoint( + Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL()) + ).build()); + } + + ResultCallback resultCallback = new ResultCallback(counters); + feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback); + + initialized = true; + log.info("VespaStorage configuration:\n" + configuration.toString()); + log.info(feedClient.getStatsAsJson()); + } + + private String findDocIdFromXml(String xml) { + try { + XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); + while (eventReader.hasNext()) { + XMLEvent event = eventReader.nextEvent(); + if (event.getEventType() == XMLEvent.START_ELEMENT) { + StartElement element = event.asStartElement(); + String elementName = element.getName().getLocalPart(); + if (VespaDocumentOperation.Operation.valid(elementName)) { + return element.getAttributeByName(QName.valueOf("documentid")).getValue(); + } + } + } + } catch (XMLStreamException | FactoryConfigurationError e) { + // as json dude does + return null; + } + return null; + } + + private String findDocId(String json) throws IOException { + JsonFactory factory = new JsonFactory(); + try(JsonParser parser = factory.createParser(json)) { + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null; + } + while (parser.nextToken() != JsonToken.END_OBJECT) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + if (VespaDocumentOperation.Operation.valid(fieldName)) { + String docId = parser.getText(); + return docId; + } else { + parser.skipChildren(); + } + } + } catch (JsonParseException ex) { + return null; + } + return null; + } + + + static class ResultCallback implements FeedClient.ResultCallback { + final VespaCounters counters; + + public ResultCallback(VespaCounters counters) { + this.counters = counters; + } + + @Override + public void onCompletion(String docId, Result documentResult) { + if (!documentResult.isSuccess()) { + counters.incrementDocumentsFailed(1); + StringBuilder sb = new StringBuilder(); + sb.append("Problems with docid "); + sb.append(docId); + sb.append(": "); + List<Result.Detail> details = documentResult.getDetails(); + for (Result.Detail detail : details) { + sb.append(detail.toString()); + sb.append(" "); + } + log.warning(sb.toString()); + return; + } + counters.incrementDocumentsOk(1); + } + + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index bef51e9ae08..97bc7dc838e 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -10,7 +10,7 @@ import java.util.Properties; /** * An output specification for writing to Vespa instances in a Map-Reduce job. - * Mainly returns an instance of a {@link VespaRecordWriter} that does the + * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the * actual feeding to Vespa. * * @author lesters @@ -35,7 +35,9 @@ public class VespaOutputFormat extends OutputFormat { public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { VespaCounters counters = VespaCounters.get(context); VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - return new VespaRecordWriter(configuration, counters); + return configuration.useLegacyClient() + ? new LegacyVespaRecordWriter(configuration, counters) + : new VespaRecordWriter(configuration, counters); } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 4cc93bfd538..73e10c39419 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -1,83 +1,71 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.DryrunResult; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.JsonParseException; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.Result; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedClientFactory; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.config.SessionParams; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import javax.xml.namespace.QName; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLEventReader; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.events.StartElement; -import javax.xml.stream.events.XMLEvent; import java.io.IOException; -import java.io.StringReader; +import java.net.URI; import java.time.Duration; +import java.util.Arrays; import java.util.List; -import java.util.StringTokenizer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; +import static java.util.stream.Collectors.toList; + /** - * VespaRecordWriter sends the output <key, value> to one or more Vespa endpoints. + * {@link VespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-feed-client. * - * @author lesters + * @author bjorncs */ -@SuppressWarnings("rawtypes") -public class VespaRecordWriter extends RecordWriter { +public class VespaRecordWriter extends RecordWriter<Object, Object> { private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName()); - private boolean initialized = false; - private FeedClient feedClient; private final VespaCounters counters; - private final int progressInterval; + private final VespaConfiguration config; - final VespaConfiguration configuration; + private boolean initialized = false; + private JsonFeeder feeder; - VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { + protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) { this.counters = counters; - this.configuration = configuration; - this.progressInterval = configuration.progressInterval(); + this.config = config; } - @Override - public void write(Object key, Object data) throws IOException, InterruptedException { - if (!initialized) { - initialize(); - } - - String doc = data.toString().trim(); - - // Parse data to find document id - if none found, skip this write - String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) - : findDocIdFromXml(doc); - if (docId != null && docId.length() >= 0) { - feedClient.stream(docId, doc); - counters.incrementDocumentsSent(1); - } else { - counters.incrementDocumentsSkipped(1); - } - - if (counters.getDocumentsSent() % progressInterval == 0) { + public void write(Object key, Object data) throws IOException { + initializeOnFirstWrite(); + String json = data.toString().trim(); + feeder.feedSingle(json) + .whenComplete((result, error) -> { + if (error != null) { + if (error instanceof JsonParseException) { + counters.incrementDocumentsSkipped(1); + } else { + log.warning("Failed to feed single document: " + error); + counters.incrementDocumentsFailed(1); + } + } else { + counters.incrementDocumentsOk(1); + } + }); + counters.incrementDocumentsSent(1); + if (counters.getDocumentsSent() % config.progressInterval() == 0) { String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", counters.getDocumentsSent(), counters.getDocumentsOk(), @@ -85,151 +73,115 @@ public class VespaRecordWriter extends RecordWriter { counters.getDocumentsSkipped()); log.info(progress); } - } - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - if (feedClient != null) { - feedClient.close(); + public void close(TaskAttemptContext context) throws IOException { + if (feeder != null) { + feeder.close(); + feeder = null; + initialized = false; } } - protected ConnectionParams.Builder configureConnectionParams() { - ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); - connParamsBuilder.setDryRun(configuration.dryrun()); - connParamsBuilder.setUseCompression(configuration.useCompression()); - connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); - connParamsBuilder.setMaxRetries(configuration.numRetries()); - if (configuration.proxyHost() != null) { - connParamsBuilder.setProxyHost(configuration.proxyHost()); - } - if (configuration.proxyPort() >= 0) { - connParamsBuilder.setProxyPort(configuration.proxyPort()); - } - return connParamsBuilder; - } + /** Override method to alter {@link FeedClient} configuration */ + protected void onFeedClientInitialization(FeedClientBuilder builder) {} - protected FeedParams.Builder configureFeedParams() { - FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); - feedParamsBuilder.setDataFormat(configuration.dataFormat()); - feedParamsBuilder.setRoute(configuration.route()); - feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); - feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); - feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); - return feedParamsBuilder; + private void initializeOnFirstWrite() { + if (initialized) return; + validateConfig(); + useRandomizedStartupDelayIfEnabled(); + feeder = createJsonStreamFeeder(); + initialized = true; } - protected SessionParams.Builder configureSessionParams() { - SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); - sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); - sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); - return sessionParamsBuilder; + private void validateConfig() { + if (!config.useSSL()) { + throw new IllegalArgumentException("SSL is required for this feed client implementation"); + } + if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { + throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); + } + if (config.proxyHost() != null) { + log.warning(String.format("Ignoring proxy config (host='%s', port=%d)", config.proxyHost(), config.proxyPort())); + } } - - private void initialize() { - if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { - int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); - log.info("VespaStorage: Delaying startup by " + delay + " ms"); + + private void useRandomizedStartupDelayIfEnabled() { + if (!config.dryrun() && config.randomStartupSleepMs() > 0) { + int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); + log.info("Delaying startup by " + delay + " ms"); try { Thread.sleep(delay); } catch (Exception e) {} } + } - ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); - FeedParams.Builder feedParamsBuilder = configureFeedParams(); - SessionParams.Builder sessionParams = configureSessionParams(); - - sessionParams.setConnectionParams(connParamsBuilder.build()); - sessionParams.setFeedParams(feedParamsBuilder.build()); - String endpoints = configuration.endpoint(); - StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); - while (tokenizer.hasMoreTokens()) { - String endpoint = tokenizer.nextToken().trim(); - sessionParams.addCluster(new Cluster.Builder().addEndpoint( - Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL()) - ).build()); + private JsonFeeder createJsonStreamFeeder() { + FeedClient feedClient = createFeedClient(); + JsonFeeder.Builder builder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofMinutes(10)); + if (config.route() != null) { + builder.withRoute(config.route()); } + return builder.build(); - ResultCallback resultCallback = new ResultCallback(counters); - feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback); - - initialized = true; - log.info("VespaStorage configuration:\n" + configuration.toString()); - log.info(feedClient.getStatsAsJson()); } - private String findDocIdFromXml(String xml) { - try { - XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); - while (eventReader.hasNext()) { - XMLEvent event = eventReader.nextEvent(); - if (event.getEventType() == XMLEvent.START_ELEMENT) { - StartElement element = event.asStartElement(); - String elementName = element.getName().getLocalPart(); - if (VespaDocumentOperation.Operation.valid(elementName)) { - return element.getAttributeByName(QName.valueOf("documentid")).getValue(); - } - } - } - } catch (XMLStreamException | FactoryConfigurationError e) { - // as json dude does - return null; + private FeedClient createFeedClient() { + if (config.dryrun()) { + return new DryrunClient(); + } else { + FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config)) + .setConnectionsPerEndpoint(config.numConnections()) + .setMaxStreamPerConnection(streamsPerConnection(config)) + .setRetryStrategy(retryStrategy(config)); + + onFeedClientInitialization(feedClientBuilder); + return feedClientBuilder.build(); } - return null; } - - private String findDocId(String json) throws IOException { - JsonFactory factory = new JsonFactory(); - try(JsonParser parser = factory.createParser(json)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String fieldName = parser.getCurrentName(); - parser.nextToken(); - if (VespaDocumentOperation.Operation.valid(fieldName)) { - String docId = parser.getText(); - return docId; - } else { - parser.skipChildren(); - } - } - } catch (JsonParseException ex) { - return null; - } - return null; + + private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { + int maxRetries = config.numRetries(); + return new FeedClient.RetryStrategy() { + @Override public int retries() { return maxRetries; } + }; } + private static int streamsPerConnection(VespaConfiguration config) { + return Math.min(256, config.maxInFlightRequests() / config.numConnections()); + } + + private static List<URI> endpointUris(VespaConfiguration config) { + return Arrays.stream(config.endpoint().split(",")) + .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort()))) + .collect(toList()); + } - static class ResultCallback implements FeedClient.ResultCallback { - final VespaCounters counters; + private static class DryrunClient implements FeedClient { - public ResultCallback(VespaCounters counters) { - this.counters = counters; + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + return createSuccessResult(documentId); } @Override - public void onCompletion(String docId, Result documentResult) { - if (!documentResult.isSuccess()) { - counters.incrementDocumentsFailed(1); - StringBuilder sb = new StringBuilder(); - sb.append("Problems with docid "); - sb.append(docId); - sb.append(": "); - List<Result.Detail> details = documentResult.getDetails(); - for (Result.Detail detail : details) { - sb.append(detail.toString()); - sb.append(" "); - } - log.warning(sb.toString()); - return; - } - counters.incrementDocumentsOk(1); + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return createSuccessResult(documentId); } - } + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override public OperationStats stats() { return null; } + @Override public void close(boolean graceful) {} + private static CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null)); + } + } } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index 2a1179dbec6..7219e621486 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -27,6 +27,7 @@ public class VespaConfiguration { public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; public static final String NUM_RETRIES = "vespa.feed.num.retries"; + public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient"; private final Configuration conf; private final Properties override; @@ -130,6 +131,7 @@ public class VespaConfiguration { return getInt(PROGRESS_REPORT, 1000); } + public boolean useLegacyClient() { return getBoolean(USE_LEGACY_CLIENT, true); } public String getString(String name) { if (override != null && override.containsKey(name)) { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java index ebb34dbc1b1..fa7965acbc1 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java @@ -4,10 +4,9 @@ package com.yahoo.vespa.hadoop.pig; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -17,22 +16,25 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.test.PathUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.*; -import java.util.*; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.StringTokenizer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MapReduceTest { @@ -44,7 +46,7 @@ public class MapReduceTest { protected static Path metricsJsonPath; protected static Path metricsCsvPath; - @BeforeClass + @BeforeAll public static void setUp() throws IOException { hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath()); @@ -62,7 +64,7 @@ public class MapReduceTest { copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data"); } - @AfterClass + @AfterAll public static void tearDown() throws IOException { Path testDir = new Path(hdfsBaseDir.getParent()); hdfs.delete(testDir, true); @@ -82,7 +84,7 @@ public class MapReduceTest { FileInputFormat.setInputPaths(job, metricsJsonPath); boolean success = job.waitForCompletion(true); - assertTrue("Job Failed", success); + assertTrue(success, "Job Failed"); VespaCounters counters = VespaCounters.get(job); assertEquals(10, counters.getDocumentsSent()); @@ -103,7 +105,7 @@ public class MapReduceTest { FileInputFormat.setInputPaths(job, metricsJsonPath); boolean success = job.waitForCompletion(true); - assertTrue("Job Failed", success); + assertTrue(success, "Job Failed"); VespaCounters counters = VespaCounters.get(job); assertEquals(10, counters.getDocumentsSent()); @@ -125,7 +127,7 @@ public class MapReduceTest { FileInputFormat.setInputPaths(job, metricsCsvPath); boolean success = job.waitForCompletion(true); - assertTrue("Job Failed", success); + assertTrue(success, "Job Failed"); VespaCounters counters = VespaCounters.get(job); assertEquals(10, counters.getDocumentsSent()); diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index bafeb593e4f..db2fab9b05e 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -12,9 +12,9 @@ import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -23,22 +23,22 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @SuppressWarnings("serial") public class VespaDocumentOperationTest { private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); private final PrintStream originalOut = System.out; - @Before + @BeforeEach public void setUpStreams() { System.setOut(new PrintStream(outContent)); } - @After + @AfterEach public void restoreStreams() { System.setOut(originalOut); } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java index 2d55017b13e..b0e2dd32c04 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java @@ -8,12 +8,16 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; public class VespaQueryTest { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index 7ca401a0cc8..3565db37126 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -1,14 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.pig; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.mapred.Counters; @@ -18,11 +12,14 @@ import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class VespaStorageTest { @@ -51,6 +48,13 @@ public class VespaStorageTest { assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); } + @Test + public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); + conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString()); + assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); + } @Test public void requireThatCreateOperationsFeedSucceeds() throws Exception { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java index 27080a8b2af..93e6a0abfdd 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java @@ -6,11 +6,11 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.io.IOException; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; public class TupleToolsTest { |