summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java1
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java22
-rw-r--r--jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java79
-rw-r--r--jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java13
-rw-r--r--jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java4
-rw-r--r--metrics/src/tests/summetrictest.cpp41
-rw-r--r--metrics/src/vespa/metrics/metric.cpp7
-rw-r--r--metrics/src/vespa/metrics/metric.h2
-rw-r--r--metrics/src/vespa/metrics/summetric.h1
-rw-r--r--metrics/src/vespa/metrics/summetric.hpp13
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java (renamed from node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java)30
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java6
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java6
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java (renamed from node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java)10
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java24
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json6
-rw-r--r--searchlib/src/tests/predicate/predicate_index_test.cpp20
-rw-r--r--searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp56
-rw-r--r--searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/common/bitvectorcache.h1
-rw-r--r--searchlib/src/vespa/searchlib/predicate/predicate_index.h5
-rw-r--r--searchlib/src/vespa/searchlib/predicate/simple_index.h2
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h31
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp2
-rw-r--r--vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java11
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java80
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java157
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java13
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java30
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java91
-rw-r--r--vespa-hadoop/pom.xml11
-rw-r--r--vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java18
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java235
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java6
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java288
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java2
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java34
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java18
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java10
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java26
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java4
43 files changed, 1010 insertions, 425 deletions
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java
index bd6c2607fc2..c9f6f67df6c 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/horizon/HorizonClient.java
@@ -1,7 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.api.integration.horizon;
-import com.yahoo.slime.Slime;
import java.io.InputStream;
/**
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java
index 422b8f22000..be8613f5eff 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/horizon/HorizonApiHandler.java
@@ -16,7 +16,9 @@ import com.yahoo.yolean.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.util.Optional;
+import java.util.function.Supplier;
import java.util.logging.Level;
/**
@@ -57,10 +59,10 @@ public class HorizonApiHandler extends LoggingRequestHandler {
private HttpResponse get(HttpRequest request) {
Path path = new Path(request.getUri());
- if (path.matches("/horizon/v1/config/dashboard/topFolders")) return new JsonInputStreamResponse(client.getTopFolders());
- if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return new JsonInputStreamResponse(client.getDashboard(path.get("id")));
- if (path.matches("/horizon/v1/config/dashboard/favorite")) return new JsonInputStreamResponse(client.getFavorite(request.getProperty("user")));
- if (path.matches("/horizon/v1/config/dashboard/recent")) return new JsonInputStreamResponse(client.getRecent(request.getProperty("user")));
+ if (path.matches("/horizon/v1/config/dashboard/topFolders")) return jsonInputStreamResponse(client::getTopFolders);
+ if (path.matches("/horizon/v1/config/dashboard/file/{id}")) return jsonInputStreamResponse(() -> client.getDashboard(path.get("id")));
+ if (path.matches("/horizon/v1/config/dashboard/favorite")) return jsonInputStreamResponse(() -> client.getFavorite(request.getProperty("user")));
+ if (path.matches("/horizon/v1/config/dashboard/recent")) return jsonInputStreamResponse(() -> client.getRecent(request.getProperty("user")));
return ErrorResponse.notFoundError("Nothing at " + path);
}
@@ -73,7 +75,7 @@ public class HorizonApiHandler extends LoggingRequestHandler {
private HttpResponse put(HttpRequest request) {
Path path = new Path(request.getUri());
- if (path.matches("/horizon/v1/config/user")) return new JsonInputStreamResponse(client.getUser());
+ if (path.matches("/horizon/v1/config/user")) return jsonInputStreamResponse(client::getUser);
return ErrorResponse.notFoundError("Nothing at " + path);
}
@@ -81,7 +83,7 @@ public class HorizonApiHandler extends LoggingRequestHandler {
SecurityContext securityContext = getAttribute(request, SecurityContext.ATTRIBUTE_NAME, SecurityContext.class);
try {
byte[] data = TsdbQueryRewriter.rewrite(request.getData().readAllBytes(), securityContext.roles(), systemName);
- return new JsonInputStreamResponse(client.getMetrics(data));
+ return jsonInputStreamResponse(() -> client.getMetrics(data));
} catch (TsdbQueryRewriter.UnauthorizedException e) {
return ErrorResponse.forbidden("Access denied");
} catch (IOException e) {
@@ -96,6 +98,14 @@ public class HorizonApiHandler extends LoggingRequestHandler {
.orElseThrow(() -> new IllegalArgumentException("Attribute '" + attributeName + "' was not set on request"));
}
+ private JsonInputStreamResponse jsonInputStreamResponse(Supplier<InputStream> supplier) {
+ try (InputStream inputStream = supplier.get()) {
+ return new JsonInputStreamResponse(inputStream);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private static class JsonInputStreamResponse extends HttpResponse {
private final InputStream jsonInputStream;
diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java
index dd4b62ee494..6136bcdfd3a 100644
--- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java
+++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilter.java
@@ -3,12 +3,14 @@ package com.yahoo.jdisc.http.filter.security.athenz;
import com.google.inject.Inject;
import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.filter.DiscFilterRequest;
import com.yahoo.jdisc.http.filter.security.athenz.RequestResourceMapper.ResourceNameAndAction;
import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase;
import com.yahoo.vespa.athenz.api.AthenzAccessToken;
import com.yahoo.vespa.athenz.api.AthenzIdentity;
import com.yahoo.vespa.athenz.api.AthenzPrincipal;
+import com.yahoo.vespa.athenz.api.AthenzRole;
import com.yahoo.vespa.athenz.api.ZToken;
import com.yahoo.vespa.athenz.tls.AthenzX509CertificateUtils;
import com.yahoo.vespa.athenz.utils.AthenzIdentities;
@@ -20,6 +22,7 @@ import java.security.cert.X509Certificate;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
@@ -56,16 +59,20 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
private final RequestResourceMapper requestResourceMapper;
private final Metric metric;
private final Set<AthenzIdentity> allowedProxyIdentities;
+ private final Optional<AthenzRole> readRole;
+ private final Optional<AthenzRole> writeRole;
@Inject
public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config, RequestResourceMapper resourceMapper, Metric metric) {
- this(config, resourceMapper, new DefaultZpe(), metric);
+ this(config, resourceMapper, new DefaultZpe(), metric, null, null);
}
public AthenzAuthorizationFilter(AthenzAuthorizationFilterConfig config,
RequestResourceMapper resourceMapper,
Zpe zpe,
- Metric metric) {
+ Metric metric,
+ AthenzRole readRole,
+ AthenzRole writeRole) {
this.roleTokenHeaderName = config.roleTokenHeaderName();
List<EnabledCredentials.Enum> enabledCredentials = config.enabledCredentials();
this.enabledCredentials = enabledCredentials.isEmpty()
@@ -77,6 +84,8 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
this.allowedProxyIdentities = config.allowedProxyIdentities().stream()
.map(AthenzIdentities::from)
.collect(Collectors.toSet());
+ this.readRole = Optional.ofNullable(readRole);
+ this.writeRole = Optional.ofNullable(writeRole);
}
@Override
@@ -86,7 +95,7 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
requestResourceMapper.getResourceNameAndAction(request);
log.log(Level.FINE, () -> String.format("Resource mapping for '%s': %s", request, resourceMapping));
if (resourceMapping.isEmpty()) {
- incrementAcceptedMetrics(request, false);
+ incrementAcceptedMetrics(request, false, Optional.empty());
return Optional.empty();
}
Result result = checkAccessAllowed(request, resourceMapping.get());
@@ -94,15 +103,15 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
setAttribute(request, RESULT_ATTRIBUTE, resultType.name());
if (resultType == AuthorizationResult.Type.ALLOW) {
populateRequestWithResult(request, result);
- incrementAcceptedMetrics(request, true);
+ incrementAcceptedMetrics(request, true, Optional.of(result));
return Optional.empty();
}
log.log(Level.FINE, () -> String.format("Forbidden (403) for '%s': %s", request, resultType.name()));
- incrementRejectedMetrics(request, FORBIDDEN, resultType.name());
+ incrementRejectedMetrics(request, FORBIDDEN, resultType.name(), Optional.of(result));
return Optional.of(new ErrorResponse(FORBIDDEN, "Access forbidden: " + resultType.getDescription()));
} catch (IllegalArgumentException e) {
log.log(Level.FINE, () -> String.format("Unauthorized (401) for '%s': %s", request, e.getMessage()));
- incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized");
+ incrementRejectedMetrics(request, UNAUTHORIZED, "Unauthorized", Optional.empty());
return Optional.of(new ErrorResponse(UNAUTHORIZED, e.getMessage()));
}
}
@@ -130,33 +139,53 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
X509Certificate identityCertificate = getClientCertificate(request).get();
AthenzIdentity peerIdentity = AthenzIdentities.from(identityCertificate);
if (allowedProxyIdentities.contains(peerIdentity)) {
- return checkAccessWithProxiedAccessToken(resourceAndAction, accessToken, identityCertificate);
+ return checkAccessWithProxiedAccessToken(request, resourceAndAction, accessToken, identityCertificate);
} else {
var zpeResult = zpe.checkAccessAllowed(
accessToken, identityCertificate, resourceAndAction.resourceName(), resourceAndAction.action());
- return new Result(ACCESS_TOKEN, peerIdentity, zpeResult);
+ return getResult(ACCESS_TOKEN, peerIdentity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles()));
}
}
- private Result checkAccessWithProxiedAccessToken(ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) {
+ private Result getResult(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, DiscFilterRequest request, ResourceNameAndAction resourceAndAction, List<String> privileges) {
+ String currentAction = resourceAndAction.action();
+ String futureAction = resourceAndAction.futureAction();
+ return new Result(credentialType, identity, zpeResult, privileges, currentAction, futureAction);
+ }
+
+ private List<String> mapToRequestPrivileges(List<AthenzRole> roles) {
+ return roles.stream()
+ .map(this::rolePrivilege)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private String rolePrivilege(AthenzRole role) {
+ if (readRole.stream().anyMatch(role::equals)) return "read";
+ if (writeRole.stream().anyMatch(role::equals)) return "write";
+ return null;
+ }
+
+ private Result checkAccessWithProxiedAccessToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction, AthenzAccessToken accessToken, X509Certificate identityCertificate) {
AthenzIdentity proxyIdentity = AthenzIdentities.from(identityCertificate);
log.log(Level.FINE,
() -> String.format("Checking proxied access token. Proxy identity: '%s'. Allowed identities: %s", proxyIdentity, allowedProxyIdentities));
var zpeResult = zpe.checkAccessAllowed(accessToken, resourceAndAction.resourceName(), resourceAndAction.action());
- return new Result(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult);
+ return getResult(ACCESS_TOKEN, AthenzIdentities.from(identityCertificate), zpeResult, request, resourceAndAction, mapToRequestPrivileges(accessToken.roles()));
}
private Result checkAccessWithRoleCertificate(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) {
X509Certificate roleCertificate = getClientCertificate(request).get();
var zpeResult = zpe.checkAccessAllowed(roleCertificate, resourceAndAction.resourceName(), resourceAndAction.action());
AthenzIdentity identity = AthenzX509CertificateUtils.getIdentityFromRoleCertificate(roleCertificate);
- return new Result(ROLE_CERTIFICATE, identity, zpeResult);
+ AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate).roleName();
+ return getResult(ROLE_CERTIFICATE, identity, zpeResult, request, resourceAndAction, mapToRequestPrivileges(List.of(AthenzX509CertificateUtils.getRolesFromRoleCertificate(roleCertificate))));
}
private Result checkAccessWithRoleToken(DiscFilterRequest request, ResourceNameAndAction resourceAndAction) {
ZToken roleToken = getRoleToken(request);
var zpeResult = zpe.checkAccessAllowed(roleToken, resourceAndAction.resourceName(), resourceAndAction.action());
- return new Result(ROLE_TOKEN, roleToken.getIdentity(), zpeResult);
+ return getResult(ROLE_TOKEN, roleToken.getIdentity(), zpeResult, request, resourceAndAction, mapToRequestPrivileges(roleToken.getRoles()));
}
private static boolean isAccessTokenPresent(DiscFilterRequest request) {
@@ -246,20 +275,30 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
request.setAttribute(name, value);
}
- private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired) {
+ private void incrementAcceptedMetrics(DiscFilterRequest request, boolean authzRequired, Optional<Result> result) {
String hostHeader = request.getHeader("Host");
Metric.Context context = metric.createContext(Map.of(
"endpoint", hostHeader != null ? hostHeader : "",
- "authz-required", Boolean.toString(authzRequired)));
+ "authz-required", Boolean.toString(authzRequired),
+ "httpMethod", HttpRequest.Method.valueOf(request.getMethod()).name(),
+ "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""),
+ "currentRequestMapping", result.map(r -> r.currentAction).orElse(""),
+ "futureRequestMapping", result.map(r -> r.futureAction).orElse("")
+ ));
metric.add(ACCEPTED_METRIC_NAME, 1L, context);
}
- private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode) {
+ private void incrementRejectedMetrics(DiscFilterRequest request, int statusCode, String zpeCode, Optional<Result> result) {
String hostHeader = request.getHeader("Host");
Metric.Context context = metric.createContext(Map.of(
"endpoint", hostHeader != null ? hostHeader : "",
"status-code", Integer.toString(statusCode),
- "zpe-status", zpeCode));
+ "zpe-status", zpeCode,
+ "httpMethod", HttpRequest.Method.valueOf(request.getMethod()),
+ "requestPrivileges", result.map(r -> String.join(",", r.requestPrivileges)).orElse(""),
+ "currentRequestMapping", result.map(r -> r.currentAction).orElse(""),
+ "futureRequestMapping", result.map(r -> r.futureAction).orElse("")
+ ));
metric.add(REJECTED_METRIC_NAME, 1L, context);
}
@@ -267,11 +306,17 @@ public class AthenzAuthorizationFilter extends JsonSecurityRequestFilterBase {
final EnabledCredentials.Enum credentialType;
final AthenzIdentity identity;
final AuthorizationResult zpeResult;
+ final List<String> requestPrivileges;
+ final String currentAction;
+ final String futureAction;
- Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult) {
+ public Result(EnabledCredentials.Enum credentialType, AthenzIdentity identity, AuthorizationResult zpeResult, List<String> requestPrivileges, String currentAction, String futureAction) {
this.credentialType = credentialType;
this.identity = identity;
this.zpeResult = zpeResult;
+ this.requestPrivileges = requestPrivileges;
+ this.currentAction = currentAction;
+ this.futureAction = futureAction;
}
}
}
diff --git a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java
index 56c52bd71c4..c962e973959 100644
--- a/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java
+++ b/jdisc-security-filters/src/main/java/com/yahoo/jdisc/http/filter/security/athenz/RequestResourceMapper.java
@@ -28,10 +28,15 @@ public interface RequestResourceMapper {
class ResourceNameAndAction {
private final AthenzResourceName resourceName;
private final String action;
+ private final String futureAction;
public ResourceNameAndAction(AthenzResourceName resourceName, String action) {
+ this(resourceName, action, action);
+ }
+ public ResourceNameAndAction(AthenzResourceName resourceName, String action, String futureAction) {
this.resourceName = resourceName;
this.action = action;
+ this.futureAction = futureAction;
}
public AthenzResourceName resourceName() {
@@ -42,6 +47,14 @@ public interface RequestResourceMapper {
return action;
}
+ public ResourceNameAndAction withFutureAction(String futureAction) {
+ return new ResourceNameAndAction(resourceName, action, futureAction);
+ }
+
+ public String futureAction() {
+ return futureAction;
+ }
+
@Override
public String toString() {
return "ResourceNameAndAction{" +
diff --git a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java
index bfe02d1f279..137e4653670 100644
--- a/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java
+++ b/jdisc-security-filters/src/test/java/com/yahoo/jdisc/http/filter/security/athenz/AthenzAuthorizationFilterTest.java
@@ -296,7 +296,9 @@ public class AthenzAuthorizationFilterTest {
.allowedProxyIdentities(allowedProxyIdentities)),
new StaticRequestResourceMapper(RESOURCE_NAME, ACTION),
zpe,
- metric);
+ metric,
+ new AthenzRole("domain","reader"),
+ new AthenzRole("domain", "writer"));
}
private static void assertAuthorizationResult(DiscFilterRequest request, Type expectedResult) {
diff --git a/metrics/src/tests/summetrictest.cpp b/metrics/src/tests/summetrictest.cpp
index e3d58659daf..d0380a630f1 100644
--- a/metrics/src/tests/summetrictest.cpp
+++ b/metrics/src/tests/summetrictest.cpp
@@ -125,4 +125,45 @@ TEST(SumMetricTest, test_start_value)
EXPECT_EQ(int64_t(60), sum.getLongValue("value"));
}
+namespace {
+
+struct MetricSetWithSum : public MetricSet
+{
+ LongValueMetric _v1;
+ LongValueMetric _v2;
+ SumMetric<LongValueMetric> _sum;
+ MetricSetWithSum();
+ ~MetricSetWithSum() override;
+};
+
+MetricSetWithSum::MetricSetWithSum()
+ : MetricSet("MetricSetWithSum", {}, ""),
+ _v1("v1", {}, "", this),
+ _v2("v2", {}, "", this),
+ _sum("sum", {}, "", this)
+{
+ _sum.addMetricToSum(_v1);
+ _sum.addMetricToSum(_v2);
+}
+
+MetricSetWithSum::~MetricSetWithSum() = default;
+
+}
+
+TEST(SumMetricTest, test_nested_sum)
+{
+ MetricSetWithSum w1;
+ MetricSetWithSum w2;
+ MetricSetWithSum sum;
+ w1._v1.addValue(10);
+ w1._v2.addValue(13);
+ w2._v1.addValue(27);
+ w2._v2.addValue(29);
+ w1.addToPart(sum);
+ w2.addToPart(sum);
+ EXPECT_EQ(int64_t(37), sum._v1.getLongValue("value"));
+ EXPECT_EQ(int64_t(42), sum._v2.getLongValue("value"));
+ EXPECT_EQ(int64_t(79), sum._sum.getLongValue("value"));
+}
+
}
diff --git a/metrics/src/vespa/metrics/metric.cpp b/metrics/src/vespa/metrics/metric.cpp
index a8d8194b26d..50fc36c62cb 100644
--- a/metrics/src/vespa/metrics/metric.cpp
+++ b/metrics/src/vespa/metrics/metric.cpp
@@ -232,4 +232,11 @@ Metric::assignValues(const Metric& m) {
assert(ownerList.empty());
return this;
}
+
+bool
+Metric::is_sum_metric() const
+{
+ return false;
+}
+
} // metrics
diff --git a/metrics/src/vespa/metrics/metric.h b/metrics/src/vespa/metrics/metric.h
index 10b74a2da22..c8fb3031278 100644
--- a/metrics/src/vespa/metrics/metric.h
+++ b/metrics/src/vespa/metrics/metric.h
@@ -247,6 +247,8 @@ public:
virtual bool isMetricSet() const { return false; }
+ virtual bool is_sum_metric() const;
+
private:
/**
diff --git a/metrics/src/vespa/metrics/summetric.h b/metrics/src/vespa/metrics/summetric.h
index f04c1696638..7b60c968e5b 100644
--- a/metrics/src/vespa/metrics/summetric.h
+++ b/metrics/src/vespa/metrics/summetric.h
@@ -69,6 +69,7 @@ public:
void printDebug(std::ostream&, const std::string& indent="") const override;
void addToPart(Metric&) const override;
void addToSnapshot(Metric&, std::vector<Metric::UP> &) const override;
+ bool is_sum_metric() const override;
private:
friend struct MetricManagerTest;
diff --git a/metrics/src/vespa/metrics/summetric.hpp b/metrics/src/vespa/metrics/summetric.hpp
index 9520456a974..e067b9643c2 100644
--- a/metrics/src/vespa/metrics/summetric.hpp
+++ b/metrics/src/vespa/metrics/summetric.hpp
@@ -142,8 +142,17 @@ template<typename AddendMetric>
void
SumMetric<AddendMetric>::addToPart(Metric& m) const
{
- std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum());
- sum.second->addToPart(m);
+ if (!m.is_sum_metric()) {
+ std::pair<std::vector<Metric::UP>, Metric::UP> sum(generateSum());
+ sum.second->addToPart(m);
+ }
+}
+
+template<typename AddendMetric>
+bool
+SumMetric<AddendMetric>::is_sum_metric() const
+{
+ return true;
}
template<typename AddendMetric>
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java
index 47337518a65..39183688340 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainer.java
@@ -20,19 +20,22 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * The application maintainer detects manual operator changes to nodes and redeploys affected applications.
- * The purpose of this is to redeploy affected applications faster than achieved by the regular application
- * maintenance to reduce the time period where the node repository and the application model is out of sync.
+ * This maintainer detects changes to nodes that must be expedited, and redeploys affected applications.
+ *
+ * The purpose of this is to redeploy affected applications faster than achieved by
+ * {@link PeriodicApplicationMaintainer}, to reduce the time period where the node repository and the application model
+ * is out of sync.
*
* Why can't the manual change directly make the application redeployment?
- * Because the redeployment must run at the right config server, while the node state change may be running
- * at any config server.
+ *
+ * Because we want to queue redeployments to avoid overloading config servers.
*
* @author bratseth
+ * @author mpolden
*/
-public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
+public class ExpeditedChangeApplicationMaintainer extends ApplicationMaintainer {
- OperatorChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) {
+ ExpeditedChangeApplicationMaintainer(Deployer deployer, Metric metric, NodeRepository nodeRepository, Duration interval) {
super(deployer, metric, nodeRepository, interval);
}
@@ -57,7 +60,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
boolean deployed = deployWithLock(application);
if (deployed)
log.info("Redeployed application " + application.toShortString() +
- " as a manual change was made to its nodes");
+ " as an expedited change was made to its nodes");
}
private boolean hasNodesWithChanges(ApplicationId applicationId, NodeList nodes) {
@@ -66,7 +69,7 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
return nodes.stream()
.flatMap(node -> node.history().events().stream())
- .filter(event -> event.agent() == Agent.operator)
+ .filter(event -> expediteChangeBy(event.agent()))
.map(History.Event::at)
.anyMatch(e -> lastDeployTime.get().isBefore(e));
}
@@ -84,5 +87,14 @@ public class OperatorChangeApplicationMaintainer extends ApplicationMaintainer {
.groupingBy(node -> node.allocation().get().owner());
}
+ /** Returns whether to expedite changes performed by agent */
+ private boolean expediteChangeBy(Agent agent) {
+ switch (agent) {
+ case operator:
+ case RebuildingOsUpgrader:
+ case HostEncrypter: return true;
+ }
+ return false;
+ }
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
index 4537d99d107..9f84322fe0f 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporter.java
@@ -200,8 +200,10 @@ public class MetricsReporter extends NodeRepositoryMaintainer {
metric.set("wantToDeprovision", node.status().wantToDeprovision() ? 1 : 0, context);
metric.set("failReport", NodeFailer.reasonsToFailParentHost(node).isEmpty() ? 0 : 1, context);
- metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context);
- metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context);
+ if (node.type().isHost()) {
+ metric.set("wantToEncrypt", node.reports().getReport("wantToEncrypt").isPresent() ? 1 : 0, context);
+ metric.set("diskEncrypted", node.reports().getReport("diskEncrypted").isPresent() ? 1 : 0, context);
+ }
HostName hostname = new HostName(node.hostname());
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java
index 40f9d17519c..cdb5202603a 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java
@@ -48,7 +48,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
maintainers.add(new NodeFailer(deployer, nodeRepository, defaults.failGrace, defaults.nodeFailerInterval, orchestrator, defaults.throttlePolicy, metric));
maintainers.add(new NodeHealthTracker(hostLivenessTracker, serviceMonitor, nodeRepository, defaults.nodeFailureStatusUpdateInterval, metric));
- maintainers.add(new OperatorChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.operatorChangeRedeployInterval));
+ maintainers.add(new ExpeditedChangeApplicationMaintainer(deployer, metric, nodeRepository, defaults.expeditedChangeRedeployInterval));
maintainers.add(new ReservationExpirer(nodeRepository, defaults.reservationExpiry, metric));
maintainers.add(new RetiredExpirer(nodeRepository, orchestrator, deployer, metric, defaults.retiredInterval, defaults.retiredExpiry));
maintainers.add(new InactiveExpirer(nodeRepository, defaults.inactiveExpiry, Map.of(NodeType.config, defaults.inactiveConfigServerExpiry,
@@ -92,7 +92,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
/** Time between each run of maintainer that does periodic redeployment */
private final Duration redeployMaintainerInterval;
/** Applications are redeployed after manual operator changes within this time period */
- private final Duration operatorChangeRedeployInterval;
+ private final Duration expeditedChangeRedeployInterval;
/** The time a node must be continuously unresponsive before it is failed */
private final Duration failGrace;
@@ -135,7 +135,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent {
nodeFailerInterval = Duration.ofMinutes(15);
nodeFailureStatusUpdateInterval = Duration.ofMinutes(2);
nodeMetricsCollectionInterval = Duration.ofMinutes(1);
- operatorChangeRedeployInterval = Duration.ofMinutes(3);
+ expeditedChangeRedeployInterval = Duration.ofMinutes(3);
// Vespa upgrade frequency is higher in CD so (de)activate OS upgrades more frequently as well
osUpgradeActivatorInterval = zone.system().isCd() ? Duration.ofSeconds(30) : Duration.ofMinutes(5);
periodicRedeployInterval = Duration.ofMinutes(60);
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java
index db6aebacddc..62d09c99f16 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/OperatorChangeApplicationMaintainerTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ExpeditedChangeApplicationMaintainerTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals;
/**
* @author bratseth
*/
-public class OperatorChangeApplicationMaintainerTest {
+public class ExpeditedChangeApplicationMaintainerTest {
@Test
public void test_application_maintenance() {
@@ -42,10 +42,10 @@ public class OperatorChangeApplicationMaintainerTest {
// Create applications
fixture.activate();
assertEquals("Initial applications are deployed", 3, fixture.deployer.redeployments);
- OperatorChangeApplicationMaintainer maintainer = new OperatorChangeApplicationMaintainer(fixture.deployer,
- new TestMetric(),
- nodeRepository,
- Duration.ofMinutes(1));
+ ExpeditedChangeApplicationMaintainer maintainer = new ExpeditedChangeApplicationMaintainer(fixture.deployer,
+ new TestMetric(),
+ nodeRepository,
+ Duration.ofMinutes(1));
clock.advance(Duration.ofMinutes(2));
maintainer.maintain();
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
index 4251d1276f8..7fa6810c1ba 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/MetricsReporterTest.java
@@ -43,6 +43,7 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -113,8 +114,6 @@ public class MetricsReporterTest {
expectedMetrics.put("wantToDeprovision", 0);
expectedMetrics.put("failReport", 0);
- expectedMetrics.put("wantToEncrypt", 0);
- expectedMetrics.put("diskEncrypted", 0);
expectedMetrics.put("allowedToBeDown", 1);
expectedMetrics.put("suspended", 1);
@@ -150,6 +149,27 @@ public class MetricsReporterTest {
assertEquals(expectedMetrics, new TreeMap<>(metric.values));
}
+ @Test
+ public void test_registered_metrics_for_host() {
+ NodeFlavors nodeFlavors = FlavorConfigBuilder.createDummies("default");
+ Orchestrator orchestrator = mock(Orchestrator.class);
+ when(orchestrator.getHostInfo(eq(reference), any())).thenReturn(
+ HostInfo.createSuspended(HostStatus.ALLOWED_TO_BE_DOWN, Instant.ofEpochSecond(1)));
+ ProvisioningTester tester = new ProvisioningTester.Builder().flavors(nodeFlavors.getFlavors()).orchestrator(orchestrator).build();
+ tester.makeProvisionedNodes(1, "default", NodeType.host, 0);
+
+ tester.clock().setInstant(Instant.ofEpochSecond(124));
+
+ TestMetric metric = new TestMetric();
+ MetricsReporter metricsReporter = metricsReporter(metric, tester);
+ metricsReporter.maintain();
+
+ // Only verify metrics that are set for hosts
+ TreeMap<String, Number> metrics = new TreeMap<>(metric.values);
+ assertTrue(metrics.containsKey("wantToEncrypt"));
+ assertTrue(metrics.containsKey("diskEncrypted"));
+ }
+
private void verifyAndRemoveIntegerMetricSum(TestMetric metric, String key, int expected) {
assertEquals(expected, (int) metric.sumNumberValues(key));
metric.remove(key);
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json
index 73ac692e37b..26d711945c6 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/restapi/responses/maintenance.json
@@ -7,6 +7,9 @@
"name": "DirtyExpirer"
},
{
+ "name": "ExpeditedChangeApplicationMaintainer"
+ },
+ {
"name": "FailedExpirer"
},
{
@@ -37,9 +40,6 @@
"name": "NodeRebooter"
},
{
- "name": "OperatorChangeApplicationMaintainer"
- },
- {
"name": "OsUpgradeActivator"
},
{
diff --git a/searchlib/src/tests/predicate/predicate_index_test.cpp b/searchlib/src/tests/predicate/predicate_index_test.cpp
index facf0054c4a..19ad0301b5c 100644
--- a/searchlib/src/tests/predicate/predicate_index_test.cpp
+++ b/searchlib/src/tests/predicate/predicate_index_test.cpp
@@ -113,7 +113,6 @@ TEST("require that PredicateIndex can index document") {
EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid());
indexFeature(index, doc_id, min_feature, {{hash, interval}}, {});
index.commit();
-
auto posting_it = lookupPosting(index, hash);
EXPECT_EQUAL(doc_id, posting_it.getKey());
uint32_t size;
@@ -123,6 +122,25 @@ TEST("require that PredicateIndex can index document") {
EXPECT_EQUAL(interval, interval_list[0]);
}
+TEST("require that bit vector cache is initialized correctly") {
+ BitVectorCache::KeyAndCountSet keySet;
+ keySet.emplace_back(hash, dummy_provider.getDocIdLimit()/2);
+ PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10);
+ EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid());
+ indexFeature(index, doc_id, min_feature, {{hash, interval}}, {});
+ index.requireCachePopulation();
+ index.populateIfNeeded(dummy_provider.getDocIdLimit());
+ EXPECT_TRUE(index.lookupCachedSet(keySet).empty());
+ index.commit();
+ EXPECT_TRUE(index.getIntervalIndex().lookup(hash).valid());
+ EXPECT_TRUE(index.lookupCachedSet(keySet).empty());
+
+ index.requireCachePopulation();
+ index.populateIfNeeded(dummy_provider.getDocIdLimit());
+ EXPECT_FALSE(index.lookupCachedSet(keySet).empty());
+}
+
+
TEST("require that PredicateIndex can index document with bounds") {
PredicateIndex index(generation_holder, dummy_provider, simple_index_config, 10);
EXPECT_FALSE(index.getIntervalIndex().lookup(hash).valid());
diff --git a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp
index 3dd2ec26dea..5b8d5f5b9ce 100644
--- a/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp
+++ b/searchlib/src/tests/queryeval/predicate/predicate_blueprint_test.cpp
@@ -86,8 +86,7 @@ TEST_F("require that blueprint with empty index estimates empty.", Fixture) {
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint with zero-constraint doc estimates non-empty.",
- Fixture) {
+TEST_F("require that blueprint with zero-constraint doc estimates non-empty.", Fixture) {
f.indexEmptyDocument(42);
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
EXPECT_FALSE(blueprint.getState().estimate().empty);
@@ -98,11 +97,9 @@ const int min_feature = 1;
const uint32_t doc_id = 2;
const uint32_t interval = 0x0001ffff;
-TEST_F("require that blueprint with posting list entry estimates non-empty.",
- Fixture) {
+TEST_F("require that blueprint with posting list entry estimates non-empty.", Fixture) {
PredicateTreeAnnotations annotations(min_feature);
- annotations.interval_map[PredicateHash::hash64("key=value")] =
- std::vector<Interval>{{interval}};
+ annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}};
f.indexDocument(doc_id, annotations);
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
@@ -110,8 +107,7 @@ TEST_F("require that blueprint with posting list entry estimates non-empty.",
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint with 'bounds' posting list entry estimates "
- "non-empty.", Fixture) {
+TEST_F("require that blueprint with 'bounds' posting list entry estimates non-empty.", Fixture) {
PredicateTreeAnnotations annotations(min_feature);
annotations.bounds_map[PredicateHash::hash64("range_key=40")] =
std::vector<IntervalWithBounds>{{interval, 0x80000003}};
@@ -122,34 +118,50 @@ TEST_F("require that blueprint with 'bounds' posting list entry estimates "
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint with zstar-compressed estimates non-empty.",
- Fixture) {
+TEST_F("require that blueprint with zstar-compressed estimates non-empty.", Fixture) {
PredicateTreeAnnotations annotations(1);
- annotations.interval_map[Constants::z_star_compressed_hash] =std::vector<Interval>{{0xfffe0000}};
+ annotations.interval_map[Constants::z_star_compressed_hash] = std::vector<Interval>{{0xfffe0000}};
f.indexDocument(doc_id, annotations);
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
EXPECT_FALSE(blueprint.getState().estimate().empty);
EXPECT_EQUAL(0u, blueprint.getState().estimate().estHits);
}
-TEST_F("require that blueprint can create search", Fixture) {
- PredicateTreeAnnotations annotations(1);
- annotations.interval_map[PredicateHash::hash64("key=value")] =std::vector<Interval>{{interval}};
- f.indexDocument(doc_id, annotations);
-
+void
+runQuery(Fixture & f, std::vector<uint32_t> expected, bool expectCachedSize, uint32_t expectedKV) {
PredicateBlueprint blueprint(f.field, f.guard(), f.query);
blueprint.fetchPostings(ExecuteInfo::TRUE);
+ EXPECT_EQUAL(expectCachedSize, blueprint.getCachedFeatures().size());
+ for (uint32_t docId : expected) {
+ EXPECT_EQUAL(expectedKV, uint32_t(blueprint.getKV()[docId]));
+ }
TermFieldMatchDataArray tfmda;
SearchIterator::UP it = blueprint.createLeafSearch(tfmda, true);
ASSERT_TRUE(it.get());
it->initFullRange();
EXPECT_EQUAL(SearchIterator::beginId(), it->getDocId());
- EXPECT_FALSE(it->seek(doc_id - 1));
- EXPECT_EQUAL(doc_id, it->getDocId());
- EXPECT_TRUE(it->seek(doc_id));
- EXPECT_EQUAL(doc_id, it->getDocId());
- EXPECT_FALSE(it->seek(doc_id + 1));
- EXPECT_TRUE(it->isAtEnd());
+ std::vector<uint32_t> actual;
+ for (it->seek(1); ! it->isAtEnd(); it->seek(it->getDocId()+1)) {
+ actual.push_back(it->getDocId());
+ }
+ EXPECT_EQUAL(expected.size(), actual.size());
+ for (size_t i(0); i < expected.size(); i++) {
+ EXPECT_EQUAL(expected[i], actual[i]);
+ }
+}
+
+TEST_F("require that blueprint can create search", Fixture) {
+ PredicateTreeAnnotations annotations(1);
+ annotations.interval_map[PredicateHash::hash64("key=value")] = std::vector<Interval>{{interval}};
+ for (size_t i(0); i < 9; i++) {
+ f.indexDocument(doc_id + i, annotations);
+ }
+ runQuery(f, {2,3,4,5,6,7,8,9,10}, 0, 1);
+ f.indexDocument(doc_id+9, annotations);
+ runQuery(f, {2, 3,4,5,6,7,8,9,10,11}, 0, 1);
+ f.index().requireCachePopulation();
+ f.indexDocument(doc_id+10, annotations);
+ runQuery(f, {2,3,4,5,6,7,8,9,10,11,12}, 1, 1);
}
TEST_F("require that blueprint can create more advanced search", Fixture) {
diff --git a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp
index c4a0e036a01..555117126a9 100644
--- a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp
@@ -100,8 +100,8 @@ PredicateAttribute::getValueCount(DocId) const
void
PredicateAttribute::onCommit()
{
- populateIfNeeded();
_index->commit();
+ populateIfNeeded();
incGeneration();
}
diff --git a/searchlib/src/vespa/searchlib/common/bitvectorcache.h b/searchlib/src/vespa/searchlib/common/bitvectorcache.h
index f81fd0163d8..a642d66f42f 100644
--- a/searchlib/src/vespa/searchlib/common/bitvectorcache.h
+++ b/searchlib/src/vespa/searchlib/common/bitvectorcache.h
@@ -41,6 +41,7 @@ public:
void adjustDocIdLimit(uint32_t docId);
void populate(uint32_t count, const PopulateInterface &);
bool needPopulation() const { return _needPopulation; }
+ void requirePopulation() { _needPopulation = true; }
private:
class KeyMeta {
public:
diff --git a/searchlib/src/vespa/searchlib/predicate/predicate_index.h b/searchlib/src/vespa/searchlib/predicate/predicate_index.h
index f4c89a2b369..49bf77f2fcc 100644
--- a/searchlib/src/vespa/searchlib/predicate/predicate_index.h
+++ b/searchlib/src/vespa/searchlib/predicate/predicate_index.h
@@ -54,8 +54,6 @@ private:
template <typename IntervalT>
void indexDocumentFeatures(uint32_t doc_id, const FeatureMap<IntervalT> &interval_map);
- PopulateInterface::Iterator::UP lookup(uint64_t key) const override;
-
public:
PredicateIndex(GenerationHolder &genHolder,
const DocIdLimitProvider &limit_provider,
@@ -105,6 +103,9 @@ public:
* Adjust size of structures to have space for docId.
*/
void adjustDocIdLimit(uint32_t docId);
+ PopulateInterface::Iterator::UP lookup(uint64_t key) const override;
+ // Exposed for testing
+ void requireCachePopulation() const { _cache.requirePopulation(); }
};
extern template class SimpleIndex<vespalib::datastore::EntryRef>;
diff --git a/searchlib/src/vespa/searchlib/predicate/simple_index.h b/searchlib/src/vespa/searchlib/predicate/simple_index.h
index 1398bb0817c..75dc540f787 100644
--- a/searchlib/src/vespa/searchlib/predicate/simple_index.h
+++ b/searchlib/src/vespa/searchlib/predicate/simple_index.h
@@ -168,8 +168,6 @@ private:
}
public:
- SimpleIndex(GenerationHolder &generation_holder, const DocIdLimitProvider &provider) :
- SimpleIndex(generation_holder, provider, SimpleIndexConfig()) {}
SimpleIndex(GenerationHolder &generation_holder,
const DocIdLimitProvider &provider, const SimpleIndexConfig &config)
: _generation_holder(generation_holder), _config(config), _limit_provider(provider) {}
diff --git a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h
index 9609cd4f6c9..ef225e86c50 100644
--- a/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h
+++ b/searchlib/src/vespa/searchlib/queryeval/predicate_blueprint.h
@@ -50,8 +50,11 @@ public:
void fetchPostings(const ExecuteInfo &execInfo) override;
SearchIterator::UP
- createLeafSearch(const fef::TermFieldMatchDataArray &tfmda,
- bool strict) const override;
+ createLeafSearch(const fef::TermFieldMatchDataArray &tfmda, bool strict) const override;
+
+ // Exposed for testing
+ const BitVectorCache::CountVector & getKV() const { return _kV; }
+ const BitVectorCache::KeySet & getCachedFeatures() const { return _cachedFeatures; }
private:
using BTreeIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::BTreeIterator;
using VectorIterator = predicate::SimpleIndex<vespalib::datastore::EntryRef>::VectorIterator;
@@ -70,24 +73,24 @@ private:
void addZeroConstraintToK();
std::vector<predicate::PredicatePostingList::UP> createPostingLists() const;
- const PredicateAttribute & _attribute;
+ const PredicateAttribute & _attribute;
const predicate::PredicateIndex &_index;
- Alloc _kVBacking;
- BitVectorCache::CountVector _kV;
- BitVectorCache::KeySet _cachedFeatures;
+ Alloc _kVBacking;
+ BitVectorCache::CountVector _kV;
+ BitVectorCache::KeySet _cachedFeatures;
- std::vector<IntervalEntry> _interval_dict_entries;
- std::vector<BoundsEntry> _bounds_dict_entries;
- vespalib::datastore::EntryRef _zstar_dict_entry;
+ std::vector<IntervalEntry> _interval_dict_entries;
+ std::vector<BoundsEntry> _bounds_dict_entries;
+ vespalib::datastore::EntryRef _zstar_dict_entry;
- std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators;
+ std::vector<IntervalIteratorEntry<BTreeIterator>> _interval_btree_iterators;
std::vector<IntervalIteratorEntry<VectorIterator>> _interval_vector_iterators;
- std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators;
- std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators;
+ std::vector<BoundsIteratorEntry<BTreeIterator>> _bounds_btree_iterators;
+ std::vector<BoundsIteratorEntry<VectorIterator>> _bounds_vector_iterators;
// The zstar iterator is either a vector or a btree iterator.
- optional<BTreeIterator> _zstar_btree_iterator;
+ optional<BTreeIterator> _zstar_btree_iterator;
optional<VectorIterator> _zstar_vector_iterator;
- bool _fetch_postings_done;
+ bool _fetch_postings_done;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 4f6e2d5016b..c94c1a415b0 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -748,7 +748,7 @@ void DistributorStripe::send_updated_host_info_if_required() {
if (_use_legacy_mode) {
_component.getStateUpdater().immediately_send_get_node_state_replies();
} else {
- _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(0); // TODO STRIPE correct stripe index!
+ _stripe_host_info_notifier.notify_stripe_wants_to_send_host_info(_stripe_index);
}
_must_send_updated_host_info = false;
}
diff --git a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java
index c085be7c205..561b20a9c8a 100644
--- a/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java
+++ b/vespa-athenz/src/main/java/com/yahoo/vespa/athenz/api/AthenzAccessToken.java
@@ -6,7 +6,10 @@ import com.auth0.jwt.interfaces.DecodedJWT;
import com.yahoo.vespa.athenz.utils.AthenzIdentities;
import java.time.Instant;
+import java.util.List;
import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Represents an Athenz Access Token
@@ -18,6 +21,8 @@ public class AthenzAccessToken {
public static final String HTTP_HEADER_NAME = "Authorization";
private static final String BEARER_TOKEN_PREFIX = "Bearer ";
+ private static final String SCOPE_CLAIM = "scp";
+ private static final String AUDIENCE_CLAIM = "aud";
private final String value;
private volatile DecodedJWT jwt;
@@ -43,6 +48,12 @@ public class AthenzAccessToken {
return jwt().getExpiresAt().toInstant();
}
public AthenzIdentity getAthenzIdentity() { return AthenzIdentities.from(jwt().getClaim("client_id").asString()); }
+ public List<AthenzRole> roles() {
+ String domain = Optional.ofNullable(jwt().getClaim(AUDIENCE_CLAIM).asString()).orElse("");
+ return Optional.ofNullable(jwt().getClaim(SCOPE_CLAIM).asList(String.class)).orElse(List.of()).stream()
+ .map(role -> new AthenzRole(domain, role))
+ .collect(Collectors.toList());
+ }
private DecodedJWT jwt() {
if (jwt == null) {
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index 5aff8dfc25d..83abe0bb872 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -84,7 +84,7 @@ public class CliClient {
builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE);
}
cliArgs.certificateAndKey().ifPresent(c -> builder.setCertificate(c.certificateFile, c.privateKeyFile));
- cliArgs.caCertificates().ifPresent(builder::setCaCertificates);
+ cliArgs.caCertificates().ifPresent(builder::setCaCertificatesFile);
cliArgs.headers().forEach(builder::addRequestHeader);
return builder.build();
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
index 672f5f080b5..163da14cead 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -127,7 +127,7 @@ class ApacheCluster implements Cluster {
.setInitialWindowSize(Integer.MAX_VALUE)
.build());
- SSLContext sslContext = constructSslContext(builder);
+ SSLContext sslContext = builder.constructSslContext();
String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
if (allowedCiphers.length == 0)
throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
@@ -142,19 +142,6 @@ class ApacheCluster implements Cluster {
.build();
}
- private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException {
- if (builder.sslContext != null) return builder.sslContext;
- SslContextBuilder sslContextBuilder = new SslContextBuilder();
- if (builder.certificate != null && builder.privateKey != null) {
- sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey);
- }
- if (builder.caCertificates != null) {
- sslContextBuilder.withCaCertificates(builder.caCertificates);
- }
- return sslContextBuilder.build();
- }
-
-
private static class ApacheHttpResponse implements HttpResponse {
private final SimpleHttpResponse wrapped;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index d0a221ed358..df1f3bcd54c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -7,8 +7,11 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Path;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,9 +38,12 @@ public class FeedClientBuilder {
int maxStreamsPerConnection = 128;
FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy;
FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10));
- Path certificate;
- Path privateKey;
- Path caCertificates;
+ Path certificateFile;
+ Path privateKeyFile;
+ Path caCertificatesFile;
+ Collection<X509Certificate> certificate;
+ PrivateKey privateKey;
+ Collection<X509Certificate> caCertificates;
public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
@@ -81,9 +87,6 @@ public class FeedClientBuilder {
}
public FeedClientBuilder setSslContext(SSLContext context) {
- if (certificate != null || caCertificates != null || privateKey != null) {
- throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates");
- }
this.sslContext = requireNonNull(context);
return this;
}
@@ -113,24 +116,77 @@ public class FeedClientBuilder {
}
public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) {
- if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate");
- this.certificate = certificatePemFile;
- this.privateKey = privateKeyPemFile;
+ this.certificateFile = certificatePemFile;
+ this.privateKeyFile = privateKeyPemFile;
+ return this;
+ }
+
+ public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ this.certificate = certificate;
+ this.privateKey = privateKey;
+ return this;
+ }
+
+ public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) {
+ return setCertificate(Collections.singletonList(certificate), privateKey);
+ }
+
+ public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) {
+ this.caCertificatesFile = caCertificatesFile;
return this;
}
- public FeedClientBuilder setCaCertificates(Path caCertificatesFile) {
- if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and CA certificate");
- this.caCertificates = caCertificatesFile;
+ public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) {
+ this.caCertificates = caCertificates;
return this;
}
public FeedClient build() {
try {
+ validateConfiguration();
return new HttpFeedClient(this);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
+ SSLContext constructSslContext() throws IOException {
+ if (sslContext != null) return sslContext;
+ SslContextBuilder sslContextBuilder = new SslContextBuilder();
+ if (certificateFile != null && privateKeyFile != null) {
+ sslContextBuilder.withCertificateAndKey(certificateFile, privateKeyFile);
+ } else if (certificate != null && privateKey != null) {
+ sslContextBuilder.withCertificateAndKey(certificate, privateKey);
+ }
+ if (caCertificatesFile != null) {
+ sslContextBuilder.withCaCertificates(caCertificatesFile);
+ } else if (caCertificates != null) {
+ sslContextBuilder.withCaCertificates(caCertificates);
+ }
+ return sslContextBuilder.build();
+ }
+
+ private void validateConfiguration() {
+ if (sslContext != null && (
+ certificateFile != null || caCertificatesFile != null || privateKeyFile != null ||
+ certificate != null || caCertificates != null || privateKey != null)) {
+ throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates");
+ }
+ if (certificate != null && certificateFile != null) {
+ throw new IllegalArgumentException("Cannot set both certificate directly and as file");
+ }
+ if (privateKey != null && privateKeyFile != null) {
+ throw new IllegalArgumentException("Cannot set both private key directly and as file");
+ }
+ if (caCertificates != null && caCertificatesFile != null) {
+ throw new IllegalArgumentException("Cannot set both CA certificates directly and as file");
+ }
+ if (certificate != null && certificate.isEmpty()) {
+ throw new IllegalArgumentException("Certificate cannot be empty");
+ }
+ if (caCertificates != null && caCertificates.isEmpty()) {
+ throw new IllegalArgumentException("CA certificates cannot be empty");
+ }
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index de32e7abdf5..b3a7aca1808 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -74,6 +74,31 @@ public class JsonFeeder implements Closeable {
public static Builder builder(FeedClient client) { return new Builder(client); }
+ /** Feeds single JSON feed operations on the form
+ * <pre>
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * }
+ * </pre>
+ */
+ public CompletableFuture<Result> feedSingle(String json) {
+ CompletableFuture<Result> result = new CompletableFuture<>();
+ try {
+ SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
+ parser.next().whenCompleteAsync((operationResult, error) -> {
+ if (error != null) {
+ result.completeExceptionally(error);
+ } else {
+ result.complete(operationResult);
+ }
+ }, resultExecutor);
+ } catch (Exception e) {
+ resultExecutor.execute(() -> result.completeExceptionally(e));
+ }
+ return result;
+ }
+
/** Feeds a stream containing a JSON array of feed operations on the form
* <pre>
* [
@@ -288,87 +313,111 @@ public class JsonFeeder implements Closeable {
}
}
+ private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ private final byte[] json;
+
+ SingleOperationParserAndExecutor(byte[] json) throws IOException {
+ super(factory.createParser(json), false);
+ this.json = json;
+ }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ return new String(json, (int) start, (int) (end - start), UTF_8);
+ }
+ }
+
private abstract class OperationParserAndExecutor {
private final JsonParser parser;
private final boolean multipleOperations;
+ private boolean arrayPrefixParsed;
protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException {
this.parser = parser;
this.multipleOperations = multipleOperations;
- if (multipleOperations) expect(START_ARRAY);
}
abstract String getDocumentJson(long start, long end);
CompletableFuture<Result> next() throws IOException {
- JsonToken token = parser.nextToken();
- if (token == END_ARRAY && multipleOperations) return null;
- else if (token == null && !multipleOperations) return null;
- else if (token == START_OBJECT);
- else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset());
- long start = 0, end = -1;
- OperationType type = null;
- DocumentId id = null;
- OperationParameters parameters = protoParameters;
- loop: while (true) {
- switch (parser.nextToken()) {
- case FIELD_NAME:
- switch (parser.getText()) {
- case "id":
- case "put": type = PUT; id = readId(); break;
- case "update": type = UPDATE; id = readId(); break;
- case "remove": type = REMOVE; id = readId(); break;
- case "condition": parameters = parameters.testAndSetCondition(readString()); break;
- case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
- case "fields": {
- expect(START_OBJECT);
- start = parser.getTokenLocation().getByteOffset();
- int depth = 1;
- while (depth > 0) switch (parser.nextToken()) {
- case START_OBJECT: ++depth; break;
- case END_OBJECT: --depth; break;
+ try {
+ if (multipleOperations && !arrayPrefixParsed){
+ expect(START_ARRAY);
+ arrayPrefixParsed = true;
+ }
+
+ JsonToken token = parser.nextToken();
+ if (token == END_ARRAY && multipleOperations) return null;
+ else if (token == null && !multipleOperations) return null;
+ else if (token == START_OBJECT);
+ else throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset());
+ long start = 0, end = -1;
+ OperationType type = null;
+ DocumentId id = null;
+ OperationParameters parameters = protoParameters;
+ loop: while (true) {
+ switch (parser.nextToken()) {
+ case FIELD_NAME:
+ switch (parser.getText()) {
+ case "id":
+ case "put": type = PUT; id = readId(); break;
+ case "update": type = UPDATE; id = readId(); break;
+ case "remove": type = REMOVE; id = readId(); break;
+ case "condition": parameters = parameters.testAndSetCondition(readString()); break;
+ case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
+ case "fields": {
+ expect(START_OBJECT);
+ start = parser.getTokenLocation().getByteOffset();
+ int depth = 1;
+ while (depth > 0) switch (parser.nextToken()) {
+ case START_OBJECT: ++depth; break;
+ case END_OBJECT: --depth; break;
+ }
+ end = parser.getTokenLocation().getByteOffset() + 1;
+ break;
}
- end = parser.getTokenLocation().getByteOffset() + 1;
- break;
+ default: throw new JsonParseException("Unexpected field name '" + parser.getText() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
}
- default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
- }
- break;
+ break;
- case END_OBJECT:
- break loop;
+ case END_OBJECT:
+ break loop;
- default:
- throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
+ default:
+ throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
}
- }
- if (id == null)
- throw new IllegalArgumentException("No document id for document at offset " + start);
-
- if (end < start)
- throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
- String payload = getDocumentJson(start, end);
- switch (type) {
- case PUT: return client.put (id, payload, parameters);
- case UPDATE: return client.update(id, payload, parameters);
- case REMOVE: return client.remove(id, parameters);
- default: throw new IllegalStateException("Unexpected operation type '" + type + "'");
+ if (id == null)
+ throw new JsonParseException("No document id for document at offset " + start);
+
+ if (end < start)
+ throw new JsonParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
+ String payload = getDocumentJson(start, end);
+ switch (type) {
+ case PUT: return client.put (id, payload, parameters);
+ case UPDATE: return client.update(id, payload, parameters);
+ case REMOVE: return client.remove(id, parameters);
+ default: throw new JsonParseException("Unexpected operation type '" + type + "'");
+ }
+ } catch (com.fasterxml.jackson.core.JacksonException e) {
+ throw new JsonParseException("Failed to parse JSON", e);
}
}
- void expect(JsonToken token) throws IOException {
+ private void expect(JsonToken token) throws IOException {
if (parser.nextToken() != token)
- throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ throw new JsonParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
}
private String readString() throws IOException {
String value = parser.nextTextValue();
if (value == null)
- throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ throw new JsonParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
return value;
@@ -377,7 +426,7 @@ public class JsonFeeder implements Closeable {
private boolean readBoolean() throws IOException {
Boolean value = parser.nextBooleanValue();
if (value == null)
- throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ throw new JsonParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
return value;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java
new file mode 100644
index 00000000000..785ffd8eb4c
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java
@@ -0,0 +1,13 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+/**
+ * @author bjorncs
+ */
+public class JsonParseException extends FeedException {
+
+ public JsonParseException(String message) { super(message); }
+
+ public JsonParseException(String message, Throwable cause) { super(message, cause); }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
index 7200d5fd943..9114e22f4a6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
@@ -20,11 +20,14 @@ import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.KeyStore;
+import java.security.KeyStoreException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
/**
@@ -39,6 +42,9 @@ class SslContextBuilder {
private Path certificateFile;
private Path privateKeyFile;
private Path caCertificatesFile;
+ private Collection<X509Certificate> certificate;
+ private PrivateKey privateKey;
+ private Collection<X509Certificate> caCertificates;
SslContextBuilder withCertificateAndKey(Path certificate, Path privateKey) {
this.certificateFile = certificate;
@@ -46,20 +52,35 @@ class SslContextBuilder {
return this;
}
+ SslContextBuilder withCertificateAndKey(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ this.certificate = certificate;
+ this.privateKey = privateKey;
+ return this;
+ }
+
SslContextBuilder withCaCertificates(Path caCertificates) {
this.caCertificatesFile = caCertificates;
return this;
}
+ SslContextBuilder withCaCertificates(Collection<X509Certificate> caCertificates) {
+ this.caCertificates = caCertificates;
+ return this;
+ }
+
SSLContext build() throws IOException {
try {
KeyStore keystore = KeyStore.getInstance("PKCS12");
keystore.load(null);
if (certificateFile != null && privateKeyFile != null) {
keystore.setKeyEntry("cert", privateKey(privateKeyFile), new char[0], certificates(certificateFile));
+ } else if (certificate != null && privateKey != null) {
+ keystore.setKeyEntry("cert", privateKey, new char[0], certificate.toArray(new Certificate[0]));
}
if (caCertificatesFile != null) {
- keystore.setCertificateEntry("ca-cert", certificates(caCertificatesFile)[0]);
+ addCaCertificates(keystore, Arrays.asList(certificates(caCertificatesFile)));
+ } else if (caCertificates != null) {
+ addCaCertificates(keystore, caCertificates);
}
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keystore, new char[0]);
@@ -73,6 +94,13 @@ class SslContextBuilder {
}
}
+ private static void addCaCertificates(KeyStore keystore, Collection<? extends Certificate> certificates) throws KeyStoreException {
+ int i = 0;
+ for (Certificate cert : certificates) {
+ keystore.setCertificateEntry("ca-cert-" + ++i, cert);
+ }
+ }
+
private static Certificate[] certificates(Path file) throws IOException, GeneralSecurityException {
try (PEMParser parser = new PEMParser(Files.newBufferedReader(file))) {
List<X509Certificate> result = new ArrayList<>();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
index e2ae5bc7155..03194e23d47 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -11,6 +11,7 @@ import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -51,52 +52,72 @@ class JsonFeederTest {
try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) {
AtomicInteger resultsReceived = new AtomicInteger();
AtomicBoolean completedSuccessfully = new AtomicBoolean();
- Set<String> ids = new HashSet<>();
long startNanos = System.nanoTime();
- JsonFeeder.builder(new FeedClient() {
+ SimpleClient feedClient = new SimpleClient();
+ JsonFeeder.builder(feedClient).build()
+ .feedMany(in, 1 << 7,
+ new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document
+ @Override
+ public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); }
- @Override
- public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
- ids.add(documentId.userSpecific());
- return createSuccessResult(documentId);
- }
+ @Override
+ public void onError(Throwable error) { exceptionThrow.set(error); }
- @Override
- public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
- return createSuccessResult(documentId);
- }
+ @Override
+ public void onComplete() { completedSuccessfully.set(true); }
+ })
+ .join();
- @Override
- public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
- return createSuccessResult(documentId);
- }
+ System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ assertEquals(docs + 1, feedClient.ids.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
+ }
+ }
- @Override
- public OperationStats stats() { return null; }
+ @Test
+ public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ try (JsonFeeder feeder = JsonFeeder.builder(new SimpleClient()).build()) {
+ String json = "{\"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n";
+ Result result = feeder.feedSingle(json).get();
+ assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId());
+ assertEquals(Result.Type.success, result.type());
+ assertEquals("success", result.resultMessage().get());
+ }
+ }
- @Override
- public void close(boolean graceful) { }
+ private static class SimpleClient implements FeedClient {
+ final Set<String> ids = new HashSet<>();
- private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
- return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
- }
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ ids.add(documentId.userSpecific());
+ return createSuccessResult(documentId);
+ }
- }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document
- @Override
- public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); }
+ @Override
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
- @Override
- public void onError(Throwable error) { exceptionThrow.set(error); }
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
- @Override
- public void onComplete() { completedSuccessfully.set(true); }
- }).join();
+ @Override
+ public OperationStats stats() { return null; }
- System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
- assertEquals(docs + 1, ids.size());
- assertEquals(docs + 1, resultsReceived.get());
- assertTrue(completedSuccessfully.get());
- assertNull(exceptionThrow.get());
+ @Override
+ public void close(boolean graceful) { }
+
+ private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
}
}
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
index 382c28dc884..39f10d84f9b 100644
--- a/vespa-hadoop/pom.xml
+++ b/vespa-hadoop/pom.xml
@@ -101,17 +101,22 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
-
<!-- Vespa feeding dependencies -->
<dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>vespa-http-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>vespa-feed-client</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
diff --git a/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java
new file mode 100644
index 00000000000..5974a8df271
--- /dev/null
+++ b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java
@@ -0,0 +1,18 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.Result.Type;
+
+/**
+ * Workaround for package-private {@link Result} constructor.
+ *
+ * @author bjorncs
+ */
+public class DryrunResult {
+
+ private DryrunResult() {}
+
+ public static Result create(Type type, DocumentId documentId, String resultMessage, String traceMessage) {
+ return new Result(type, documentId, resultMessage, traceMessage);
+ }
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
new file mode 100644
index 00000000000..b716c55beb5
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
@@ -0,0 +1,235 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedClientFactory;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.io.IOException;
+import java.io.StringReader;
+import java.time.Duration;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+/**
+ * {@link LegacyVespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-http-client.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class LegacyVespaRecordWriter extends RecordWriter {
+
+ private final static Logger log = Logger.getLogger(LegacyVespaRecordWriter.class.getCanonicalName());
+
+ private boolean initialized = false;
+ private FeedClient feedClient;
+ private final VespaCounters counters;
+ private final int progressInterval;
+
+ final VespaConfiguration configuration;
+
+ LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ this.counters = counters;
+ this.configuration = configuration;
+ this.progressInterval = configuration.progressInterval();
+ }
+
+
+ @Override
+ public void write(Object key, Object data) throws IOException, InterruptedException {
+ if (!initialized) {
+ initialize();
+ }
+
+ String doc = data.toString().trim();
+
+ // Parse data to find document id - if none found, skip this write
+ String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
+ : findDocIdFromXml(doc);
+ if (docId != null && docId.length() >= 0) {
+ feedClient.stream(docId, doc);
+ counters.incrementDocumentsSent(1);
+ } else {
+ counters.incrementDocumentsSkipped(1);
+ }
+
+ if (counters.getDocumentsSent() % progressInterval == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+
+ }
+
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (feedClient != null) {
+ feedClient.close();
+ }
+ }
+
+ protected ConnectionParams.Builder configureConnectionParams() {
+ ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
+ connParamsBuilder.setDryRun(configuration.dryrun());
+ connParamsBuilder.setUseCompression(configuration.useCompression());
+ connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ connParamsBuilder.setMaxRetries(configuration.numRetries());
+ if (configuration.proxyHost() != null) {
+ connParamsBuilder.setProxyHost(configuration.proxyHost());
+ }
+ if (configuration.proxyPort() >= 0) {
+ connParamsBuilder.setProxyPort(configuration.proxyPort());
+ }
+ return connParamsBuilder;
+ }
+
+ protected FeedParams.Builder configureFeedParams() {
+ FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
+ feedParamsBuilder.setDataFormat(configuration.dataFormat());
+ feedParamsBuilder.setRoute(configuration.route());
+ feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
+ feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
+ feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis());
+ return feedParamsBuilder;
+ }
+
+ protected SessionParams.Builder configureSessionParams() {
+ SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder();
+ sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize());
+ sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2);
+ return sessionParamsBuilder;
+ }
+
+ private void initialize() {
+ if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) {
+ int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs());
+ log.info("VespaStorage: Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+
+ ConnectionParams.Builder connParamsBuilder = configureConnectionParams();
+ FeedParams.Builder feedParamsBuilder = configureFeedParams();
+ SessionParams.Builder sessionParams = configureSessionParams();
+
+ sessionParams.setConnectionParams(connParamsBuilder.build());
+ sessionParams.setFeedParams(feedParamsBuilder.build());
+
+ String endpoints = configuration.endpoint();
+ StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String endpoint = tokenizer.nextToken().trim();
+ sessionParams.addCluster(new Cluster.Builder().addEndpoint(
+ Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL())
+ ).build());
+ }
+
+ ResultCallback resultCallback = new ResultCallback(counters);
+ feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
+
+ initialized = true;
+ log.info("VespaStorage configuration:\n" + configuration.toString());
+ log.info(feedClient.getStatsAsJson());
+ }
+
+ private String findDocIdFromXml(String xml) {
+ try {
+ XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
+ while (eventReader.hasNext()) {
+ XMLEvent event = eventReader.nextEvent();
+ if (event.getEventType() == XMLEvent.START_ELEMENT) {
+ StartElement element = event.asStartElement();
+ String elementName = element.getName().getLocalPart();
+ if (VespaDocumentOperation.Operation.valid(elementName)) {
+ return element.getAttributeByName(QName.valueOf("documentid")).getValue();
+ }
+ }
+ }
+ } catch (XMLStreamException | FactoryConfigurationError e) {
+ // as json dude does
+ return null;
+ }
+ return null;
+ }
+
+ private String findDocId(String json) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try(JsonParser parser = factory.createParser(json)) {
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return null;
+ }
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ String fieldName = parser.getCurrentName();
+ parser.nextToken();
+ if (VespaDocumentOperation.Operation.valid(fieldName)) {
+ String docId = parser.getText();
+ return docId;
+ } else {
+ parser.skipChildren();
+ }
+ }
+ } catch (JsonParseException ex) {
+ return null;
+ }
+ return null;
+ }
+
+
+ static class ResultCallback implements FeedClient.ResultCallback {
+ final VespaCounters counters;
+
+ public ResultCallback(VespaCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ if (!documentResult.isSuccess()) {
+ counters.incrementDocumentsFailed(1);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Problems with docid ");
+ sb.append(docId);
+ sb.append(": ");
+ List<Result.Detail> details = documentResult.getDetails();
+ for (Result.Detail detail : details) {
+ sb.append(detail.toString());
+ sb.append(" ");
+ }
+ log.warning(sb.toString());
+ return;
+ }
+ counters.incrementDocumentsOk(1);
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
index bef51e9ae08..97bc7dc838e 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -10,7 +10,7 @@ import java.util.Properties;
/**
* An output specification for writing to Vespa instances in a Map-Reduce job.
- * Mainly returns an instance of a {@link VespaRecordWriter} that does the
+ * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the
* actual feeding to Vespa.
*
* @author lesters
@@ -35,7 +35,9 @@ public class VespaOutputFormat extends OutputFormat {
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
VespaCounters counters = VespaCounters.get(context);
VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
- return new VespaRecordWriter(configuration, counters);
+ return configuration.useLegacyClient()
+ ? new LegacyVespaRecordWriter(configuration, counters)
+ : new VespaRecordWriter(configuration, counters);
}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
index 4cc93bfd538..73e10c39419 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -1,83 +1,71 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.mapreduce;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.DryrunResult;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.JsonParseException;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.Result;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedClientFactory;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
-import com.yahoo.vespa.http.client.config.SessionParams;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import javax.xml.namespace.QName;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLEventReader;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.events.StartElement;
-import javax.xml.stream.events.XMLEvent;
import java.io.IOException;
-import java.io.StringReader;
+import java.net.URI;
import java.time.Duration;
+import java.util.Arrays;
import java.util.List;
-import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
+import static java.util.stream.Collectors.toList;
+
/**
- * VespaRecordWriter sends the output &lt;key, value&gt; to one or more Vespa endpoints.
+ * {@link VespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-feed-client.
*
- * @author lesters
+ * @author bjorncs
*/
-@SuppressWarnings("rawtypes")
-public class VespaRecordWriter extends RecordWriter {
+public class VespaRecordWriter extends RecordWriter<Object, Object> {
private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
- private boolean initialized = false;
- private FeedClient feedClient;
private final VespaCounters counters;
- private final int progressInterval;
+ private final VespaConfiguration config;
- final VespaConfiguration configuration;
+ private boolean initialized = false;
+ private JsonFeeder feeder;
- VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) {
this.counters = counters;
- this.configuration = configuration;
- this.progressInterval = configuration.progressInterval();
+ this.config = config;
}
-
@Override
- public void write(Object key, Object data) throws IOException, InterruptedException {
- if (!initialized) {
- initialize();
- }
-
- String doc = data.toString().trim();
-
- // Parse data to find document id - if none found, skip this write
- String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
- : findDocIdFromXml(doc);
- if (docId != null && docId.length() >= 0) {
- feedClient.stream(docId, doc);
- counters.incrementDocumentsSent(1);
- } else {
- counters.incrementDocumentsSkipped(1);
- }
-
- if (counters.getDocumentsSent() % progressInterval == 0) {
+ public void write(Object key, Object data) throws IOException {
+ initializeOnFirstWrite();
+ String json = data.toString().trim();
+ feeder.feedSingle(json)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ if (error instanceof JsonParseException) {
+ counters.incrementDocumentsSkipped(1);
+ } else {
+ log.warning("Failed to feed single document: " + error);
+ counters.incrementDocumentsFailed(1);
+ }
+ } else {
+ counters.incrementDocumentsOk(1);
+ }
+ });
+ counters.incrementDocumentsSent(1);
+ if (counters.getDocumentsSent() % config.progressInterval() == 0) {
String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
counters.getDocumentsSent(),
counters.getDocumentsOk(),
@@ -85,151 +73,115 @@ public class VespaRecordWriter extends RecordWriter {
counters.getDocumentsSkipped());
log.info(progress);
}
-
}
-
@Override
- public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- if (feedClient != null) {
- feedClient.close();
+ public void close(TaskAttemptContext context) throws IOException {
+ if (feeder != null) {
+ feeder.close();
+ feeder = null;
+ initialized = false;
}
}
- protected ConnectionParams.Builder configureConnectionParams() {
- ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
- connParamsBuilder.setDryRun(configuration.dryrun());
- connParamsBuilder.setUseCompression(configuration.useCompression());
- connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
- connParamsBuilder.setMaxRetries(configuration.numRetries());
- if (configuration.proxyHost() != null) {
- connParamsBuilder.setProxyHost(configuration.proxyHost());
- }
- if (configuration.proxyPort() >= 0) {
- connParamsBuilder.setProxyPort(configuration.proxyPort());
- }
- return connParamsBuilder;
- }
+ /** Override method to alter {@link FeedClient} configuration */
+ protected void onFeedClientInitialization(FeedClientBuilder builder) {}
- protected FeedParams.Builder configureFeedParams() {
- FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
- feedParamsBuilder.setDataFormat(configuration.dataFormat());
- feedParamsBuilder.setRoute(configuration.route());
- feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
- feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
- feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis());
- return feedParamsBuilder;
+ private void initializeOnFirstWrite() {
+ if (initialized) return;
+ validateConfig();
+ useRandomizedStartupDelayIfEnabled();
+ feeder = createJsonStreamFeeder();
+ initialized = true;
}
- protected SessionParams.Builder configureSessionParams() {
- SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder();
- sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize());
- sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2);
- return sessionParamsBuilder;
+ private void validateConfig() {
+ if (!config.useSSL()) {
+ throw new IllegalArgumentException("SSL is required for this feed client implementation");
+ }
+ if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) {
+ throw new IllegalArgumentException("Only JSON is support by this feed client implementation");
+ }
+ if (config.proxyHost() != null) {
+ log.warning(String.format("Ignoring proxy config (host='%s', port=%d)", config.proxyHost(), config.proxyPort()));
+ }
}
-
- private void initialize() {
- if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) {
- int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs());
- log.info("VespaStorage: Delaying startup by " + delay + " ms");
+
+ private void useRandomizedStartupDelayIfEnabled() {
+ if (!config.dryrun() && config.randomStartupSleepMs() > 0) {
+ int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
+ log.info("Delaying startup by " + delay + " ms");
try {
Thread.sleep(delay);
} catch (Exception e) {}
}
+ }
- ConnectionParams.Builder connParamsBuilder = configureConnectionParams();
- FeedParams.Builder feedParamsBuilder = configureFeedParams();
- SessionParams.Builder sessionParams = configureSessionParams();
-
- sessionParams.setConnectionParams(connParamsBuilder.build());
- sessionParams.setFeedParams(feedParamsBuilder.build());
- String endpoints = configuration.endpoint();
- StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
- while (tokenizer.hasMoreTokens()) {
- String endpoint = tokenizer.nextToken().trim();
- sessionParams.addCluster(new Cluster.Builder().addEndpoint(
- Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL())
- ).build());
+ private JsonFeeder createJsonStreamFeeder() {
+ FeedClient feedClient = createFeedClient();
+ JsonFeeder.Builder builder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofMinutes(10));
+ if (config.route() != null) {
+ builder.withRoute(config.route());
}
+ return builder.build();
- ResultCallback resultCallback = new ResultCallback(counters);
- feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
-
- initialized = true;
- log.info("VespaStorage configuration:\n" + configuration.toString());
- log.info(feedClient.getStatsAsJson());
}
- private String findDocIdFromXml(String xml) {
- try {
- XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
- while (eventReader.hasNext()) {
- XMLEvent event = eventReader.nextEvent();
- if (event.getEventType() == XMLEvent.START_ELEMENT) {
- StartElement element = event.asStartElement();
- String elementName = element.getName().getLocalPart();
- if (VespaDocumentOperation.Operation.valid(elementName)) {
- return element.getAttributeByName(QName.valueOf("documentid")).getValue();
- }
- }
- }
- } catch (XMLStreamException | FactoryConfigurationError e) {
- // as json dude does
- return null;
+ private FeedClient createFeedClient() {
+ if (config.dryrun()) {
+ return new DryrunClient();
+ } else {
+ FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config))
+ .setConnectionsPerEndpoint(config.numConnections())
+ .setMaxStreamPerConnection(streamsPerConnection(config))
+ .setRetryStrategy(retryStrategy(config));
+
+ onFeedClientInitialization(feedClientBuilder);
+ return feedClientBuilder.build();
}
- return null;
}
-
- private String findDocId(String json) throws IOException {
- JsonFactory factory = new JsonFactory();
- try(JsonParser parser = factory.createParser(json)) {
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return null;
- }
- while (parser.nextToken() != JsonToken.END_OBJECT) {
- String fieldName = parser.getCurrentName();
- parser.nextToken();
- if (VespaDocumentOperation.Operation.valid(fieldName)) {
- String docId = parser.getText();
- return docId;
- } else {
- parser.skipChildren();
- }
- }
- } catch (JsonParseException ex) {
- return null;
- }
- return null;
+
+ private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) {
+ int maxRetries = config.numRetries();
+ return new FeedClient.RetryStrategy() {
+ @Override public int retries() { return maxRetries; }
+ };
}
+ private static int streamsPerConnection(VespaConfiguration config) {
+ return Math.min(256, config.maxInFlightRequests() / config.numConnections());
+ }
+
+ private static List<URI> endpointUris(VespaConfiguration config) {
+ return Arrays.stream(config.endpoint().split(","))
+ .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort())))
+ .collect(toList());
+ }
- static class ResultCallback implements FeedClient.ResultCallback {
- final VespaCounters counters;
+ private static class DryrunClient implements FeedClient {
- public ResultCallback(VespaCounters counters) {
- this.counters = counters;
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ return createSuccessResult(documentId);
}
@Override
- public void onCompletion(String docId, Result documentResult) {
- if (!documentResult.isSuccess()) {
- counters.incrementDocumentsFailed(1);
- StringBuilder sb = new StringBuilder();
- sb.append("Problems with docid ");
- sb.append(docId);
- sb.append(": ");
- List<Result.Detail> details = documentResult.getDetails();
- for (Result.Detail detail : details) {
- sb.append(detail.toString());
- sb.append(" ");
- }
- log.warning(sb.toString());
- return;
- }
- counters.incrementDocumentsOk(1);
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return createSuccessResult(documentId);
}
- }
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
+
+ @Override public OperationStats stats() { return null; }
+ @Override public void close(boolean graceful) {}
+ private static CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null));
+ }
+ }
}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
index 2a1179dbec6..7219e621486 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -27,6 +27,7 @@ public class VespaConfiguration {
public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
public static final String NUM_RETRIES = "vespa.feed.num.retries";
+ public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient";
private final Configuration conf;
private final Properties override;
@@ -130,6 +131,7 @@ public class VespaConfiguration {
return getInt(PROGRESS_REPORT, 1000);
}
+ public boolean useLegacyClient() { return getBoolean(USE_LEGACY_CLIENT, true); }
public String getString(String name) {
if (override != null && override.containsKey(name)) {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
index ebb34dbc1b1..fa7965acbc1 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
@@ -4,10 +4,9 @@ package com.yahoo.vespa.hadoop.pig;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -17,22 +16,25 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.test.PathUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.*;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.StringTokenizer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MapReduceTest {
@@ -44,7 +46,7 @@ public class MapReduceTest {
protected static Path metricsJsonPath;
protected static Path metricsCsvPath;
- @BeforeClass
+ @BeforeAll
public static void setUp() throws IOException {
hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath());
@@ -62,7 +64,7 @@ public class MapReduceTest {
copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data");
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws IOException {
Path testDir = new Path(hdfsBaseDir.getParent());
hdfs.delete(testDir, true);
@@ -82,7 +84,7 @@ public class MapReduceTest {
FileInputFormat.setInputPaths(job, metricsJsonPath);
boolean success = job.waitForCompletion(true);
- assertTrue("Job Failed", success);
+ assertTrue(success, "Job Failed");
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
@@ -103,7 +105,7 @@ public class MapReduceTest {
FileInputFormat.setInputPaths(job, metricsJsonPath);
boolean success = job.waitForCompletion(true);
- assertTrue("Job Failed", success);
+ assertTrue(success, "Job Failed");
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
@@ -125,7 +127,7 @@ public class MapReduceTest {
FileInputFormat.setInputPaths(job, metricsCsvPath);
boolean success = job.waitForCompletion(true);
- assertTrue("Job Failed", success);
+ assertTrue(success, "Job Failed");
VespaCounters counters = VespaCounters.get(job);
assertEquals(10, counters.getDocumentsSent());
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
index bafeb593e4f..db2fab9b05e 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -12,9 +12,9 @@ import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -23,22 +23,22 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("serial")
public class VespaDocumentOperationTest {
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
- @Before
+ @BeforeEach
public void setUpStreams() {
System.setOut(new PrintStream(outContent));
}
- @After
+ @AfterEach
public void restoreStreams() {
System.setOut(originalOut);
}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
index 2d55017b13e..b0e2dd32c04 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
@@ -8,12 +8,16 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class VespaQueryTest {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
index 7ca401a0cc8..3565db37126 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -1,14 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.pig;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.mapred.Counters;
@@ -18,11 +12,14 @@ import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class VespaStorageTest {
@@ -51,6 +48,13 @@ public class VespaStorageTest {
assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
}
+ @Test
+ public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString());
+ conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString());
+ assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf);
+ }
@Test
public void requireThatCreateOperationsFeedSucceeds() throws Exception {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
index 27080a8b2af..93e6a0abfdd 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
@@ -6,11 +6,11 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class TupleToolsTest {