summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2022-06-06 16:35:20 +0200
committergjoranv <gv@verizonmedia.com>2022-06-08 11:45:30 +0200
commit3c11ceb432b5bcb7639758330fad5433ec03f264 (patch)
tree6ba2c384ec40f377ce370b9a0dcaeaedac128067 /vespa-hadoop
parent22f76eae5d33da3c4acea18b827873b6ec36ef61 (diff)
Remove http client use
Diffstat (limited to 'vespa-hadoop')
-rw-r--r--vespa-hadoop/pom.xml5
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java233
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java18
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java8
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java38
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java18
6 files changed, 2 insertions, 318 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
index 0c724c95839..1dde2d58610 100644
--- a/vespa-hadoop/pom.xml
+++ b/vespa-hadoop/pom.xml
@@ -136,11 +136,6 @@
<!-- Vespa feeding dependencies -->
<dependency>
<groupId>com.yahoo.vespa</groupId>
- <artifactId>vespa-http-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
<artifactId>vespa-feed-client</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
deleted file mode 100644
index 6900c7dc82f..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
+++ /dev/null
@@ -1,233 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLEventReader;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.events.StartElement;
-import javax.xml.stream.events.XMLEvent;
-import java.io.IOException;
-import java.io.StringReader;
-import java.time.Duration;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.logging.Logger;
-
-/**
- * {@link LegacyVespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-http-client.
- *
- * @author lesters
- * @deprecated Replaced by {@link VespaRecordWriter}
- */
-@Deprecated
-public class LegacyVespaRecordWriter extends RecordWriter<Object, Object> {
-
- private final Logger log = Logger.getLogger(getClass().getCanonicalName());
-
- private boolean initialized = false;
- private com.yahoo.vespa.http.client.FeedClient feedClient;
- private final VespaCounters counters;
- private final int progressInterval;
-
- final VespaConfiguration configuration;
-
- LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
- this.counters = counters;
- this.configuration = configuration;
- this.progressInterval = configuration.progressInterval();
- }
-
-
- @Override
- public void write(Object key, Object data) throws IOException, InterruptedException {
- if (!initialized) {
- initialize();
- }
-
- String doc = data.toString().trim();
-
- // Parse data to find document id - if none found, skip this write
- String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
- : findDocIdFromXml(doc);
- if (docId != null && docId.length() >= 0) {
- feedClient.stream(docId, doc);
- counters.incrementDocumentsSent(1);
- } else {
- counters.incrementDocumentsSkipped(1);
- }
-
- if (counters.getDocumentsSent() % progressInterval == 0) {
- String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
- counters.getDocumentsSent(),
- counters.getDocumentsOk(),
- counters.getDocumentsFailed(),
- counters.getDocumentsSkipped());
- log.info(progress);
- }
-
- }
-
-
- @Override
- public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- if (feedClient != null) {
- feedClient.close();
- }
- }
-
- protected ConnectionParams.Builder configureConnectionParams() {
- ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
- connParamsBuilder.setDryRun(configuration.dryrun());
- connParamsBuilder.setUseCompression(configuration.useCompression());
- connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
- connParamsBuilder.setMaxRetries(configuration.numRetries());
- if (configuration.proxyHost() != null) {
- connParamsBuilder.setProxyHost(configuration.proxyHost());
- }
- if (configuration.proxyPort() >= 0) {
- connParamsBuilder.setProxyPort(configuration.proxyPort());
- }
- return connParamsBuilder;
- }
-
- protected FeedParams.Builder configureFeedParams() {
- FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
- feedParamsBuilder.setDataFormat(configuration.dataFormat());
- feedParamsBuilder.setRoute(configuration.route());
- feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
- feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
- feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis());
- return feedParamsBuilder;
- }
-
- protected SessionParams.Builder configureSessionParams() {
- SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder();
- sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize());
- sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2);
- return sessionParamsBuilder;
- }
-
- private void initialize() {
- if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) {
- int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs());
- log.info("VespaStorage: Delaying startup by " + delay + " ms");
- try {
- Thread.sleep(delay);
- } catch (Exception e) {}
- }
-
- ConnectionParams.Builder connParamsBuilder = configureConnectionParams();
- FeedParams.Builder feedParamsBuilder = configureFeedParams();
- SessionParams.Builder sessionParams = configureSessionParams();
-
- sessionParams.setConnectionParams(connParamsBuilder.build());
- sessionParams.setFeedParams(feedParamsBuilder.build());
-
- String endpoints = configuration.endpoint();
- StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
- while (tokenizer.hasMoreTokens()) {
- String endpoint = tokenizer.nextToken().trim();
- sessionParams.addCluster(new Cluster.Builder().addEndpoint(
- Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL().orElse(false))
- ).build());
- }
-
- ResultCallback resultCallback = new ResultCallback(counters);
- feedClient = com.yahoo.vespa.http.client.FeedClientFactory.create(sessionParams.build(), resultCallback);
-
- initialized = true;
- log.info("VespaStorage configuration:\n" + configuration.toString());
- log.info(feedClient.getStatsAsJson());
- }
-
- private String findDocIdFromXml(String xml) {
- try {
- XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
- while (eventReader.hasNext()) {
- XMLEvent event = eventReader.nextEvent();
- if (event.getEventType() == XMLEvent.START_ELEMENT) {
- StartElement element = event.asStartElement();
- String elementName = element.getName().getLocalPart();
- if (VespaDocumentOperation.Operation.valid(elementName)) {
- return element.getAttributeByName(QName.valueOf("documentid")).getValue();
- }
- }
- }
- } catch (XMLStreamException | FactoryConfigurationError e) {
- // as json dude does
- return null;
- }
- return null;
- }
-
- private String findDocId(String json) throws IOException {
- JsonFactory factory = new JsonFactory();
- try(JsonParser parser = factory.createParser(json)) {
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return null;
- }
- while (parser.nextToken() != JsonToken.END_OBJECT) {
- String fieldName = parser.getCurrentName();
- parser.nextToken();
- if (VespaDocumentOperation.Operation.valid(fieldName)) {
- String docId = parser.getText();
- return docId;
- } else {
- parser.skipChildren();
- }
- }
- } catch (JsonParseException ex) {
- return null;
- }
- return null;
- }
-
-
- class ResultCallback implements com.yahoo.vespa.http.client.FeedClient.ResultCallback {
- final VespaCounters counters;
-
- public ResultCallback(VespaCounters counters) {
- this.counters = counters;
- }
-
- @Override
- public void onCompletion(String docId, com.yahoo.vespa.http.client.Result documentResult) {
- if (!documentResult.isSuccess()) {
- counters.incrementDocumentsFailed(1);
- StringBuilder sb = new StringBuilder();
- sb.append("Problems with docid ");
- sb.append(docId);
- sb.append(": ");
- List<com.yahoo.vespa.http.client.Result.Detail> details = documentResult.getDetails();
- for (com.yahoo.vespa.http.client.Result.Detail detail : details) {
- sb.append(detail.toString());
- sb.append(" ");
- }
- log.warning(sb.toString());
- return;
- }
- counters.incrementDocumentsOk(1);
- }
-
- }
-
-}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
index 66ab94574d9..42eb6293eee 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -10,24 +10,16 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
-import java.util.Objects;
import java.util.Properties;
-import java.util.logging.Logger;
-
-import static com.yahoo.vespa.http.client.config.FeedParams.DataFormat.XML_UTF8;
/**
* An output specification for writing to Vespa instances in a Map-Reduce job.
- * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the
- * actual feeding to Vespa.
*
* @author lesters
*/
@SuppressWarnings("rawtypes")
public class VespaOutputFormat extends OutputFormat {
- private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName());
-
final Properties configOverride;
public VespaOutputFormat() {
@@ -42,18 +34,10 @@ public class VespaOutputFormat extends OutputFormat {
@Override
- @SuppressWarnings("deprecation")
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException {
VespaCounters counters = VespaCounters.get(context);
VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
- Boolean useLegacyClient = configuration.useLegacyClient().orElse(null);
- if (Objects.equals(useLegacyClient, Boolean.TRUE) || configuration.dataFormat() == XML_UTF8) {
- log.warning("Feeding with legacy client or XML will no longer be supported on Vespa 8. " +
- "See https://docs.vespa.ai/en/vespa8-release-notes.html");
- return new LegacyVespaRecordWriter(configuration, counters);
- } else {
- return new VespaRecordWriter(configuration, counters);
- }
+ return new VespaRecordWriter(configuration, counters);
}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
index 6d6c3789835..c450d7cdeef 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -7,7 +7,6 @@ import ai.vespa.feed.client.JsonFeeder;
import ai.vespa.feed.client.OperationParseException;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import com.yahoo.vespa.http.client.config.FeedParams;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -85,18 +84,11 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> {
private void initializeOnFirstWrite() {
if (initialized) return;
- validateConfig();
useRandomizedStartupDelayIfEnabled();
feeder = createJsonStreamFeeder();
initialized = true;
}
- private void validateConfig() {
- if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) {
- throw new IllegalArgumentException("Only JSON is support by this feed client implementation");
- }
- }
-
private void useRandomizedStartupDelayIfEnabled() {
if (!config.dryrun() && config.randomStartupSleepMs() > 0) {
int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
index 715546fe6fe..54be261fbe7 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.mapreduce.util;
-import com.yahoo.vespa.http.client.config.FeedParams;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
@@ -19,7 +18,6 @@ public class VespaConfiguration {
public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme";
public static final String DRYRUN = "vespa.feed.dryrun";
public static final String USE_COMPRESSION = "vespa.feed.usecompression";
- public static final String DATA_FORMAT = "vespa.feed.data.format";
public static final String PROGRESS_REPORT = "vespa.feed.progress.interval";
public static final String CONNECTIONS = "vespa.feed.connections";
public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
@@ -29,7 +27,6 @@ public class VespaConfiguration {
public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
public static final String NUM_RETRIES = "vespa.feed.num.retries";
- public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient";
private final Configuration conf;
private final Properties override;
@@ -39,115 +36,82 @@ public class VespaConfiguration {
this.override = override;
}
-
public static VespaConfiguration get(Configuration conf, Properties override) {
return new VespaConfiguration(conf, override);
}
-
public String endpoint() {
return getString(ENDPOINT);
}
-
public int defaultPort() {
return getInt(DEFAULT_PORT, 4080);
}
-
public Optional<Boolean> useSSL() {
String raw = getString(USE_SSL);
if (raw == null || raw.trim().isEmpty()) return Optional.empty();
return Optional.of(Boolean.parseBoolean(raw));
}
-
public String proxyHost() {
return getString(PROXY_HOST);
}
-
public int proxyPort() {
return getInt(PROXY_PORT, 4080);
}
-
public String proxyScheme() {
String raw = getString(PROXY_SCHEME);
if (raw == null) return "http";
return raw;
}
-
public boolean dryrun() {
return getBoolean(DRYRUN, false);
}
-
public boolean useCompression() {
return getBoolean(USE_COMPRESSION, true);
}
-
public int numConnections() {
return getInt(CONNECTIONS, 1);
}
-
public int throttlerMinSize() {
return getInt(THROTTLER_MIN_SIZE, 0);
}
-
public int queryConnectionTimeout() {
return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
}
-
public String route() {
return getString(ROUTE);
}
-
public int maxSleepTimeMs() {
return getInt(MAX_SLEEP_TIME_MS, 10000);
}
-
public int maxInFlightRequests() {
return getInt(MAX_IN_FLIGHT_REQUESTS, 500);
}
-
public int randomStartupSleepMs() {
return getInt(RANDOM_STARTUP_SLEEP, 30000);
}
-
public int numRetries() {
return getInt(NUM_RETRIES, 100);
}
-
- public FeedParams.DataFormat dataFormat() {
- String format = getString(DATA_FORMAT);
- if ("xml".equalsIgnoreCase(format)) {
- return FeedParams.DataFormat.XML_UTF8;
- }
- return FeedParams.DataFormat.JSON_UTF8;
- }
-
-
public int progressInterval() {
return getInt(PROGRESS_REPORT, 1000);
}
- public Optional<Boolean> useLegacyClient() {
- String raw = getString(USE_LEGACY_CLIENT);
- if (raw == null || raw.trim().isEmpty()) return Optional.empty();
- return Optional.of(Boolean.parseBoolean(raw));
- }
-
public String getString(String name) {
if (override != null && override.containsKey(name)) {
return override.getProperty(name);
@@ -197,7 +161,6 @@ public class VespaConfiguration {
sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n");
sb.append(DRYRUN + ": " + dryrun() +"\n");
sb.append(USE_COMPRESSION + ": " + useCompression() +"\n");
- sb.append(DATA_FORMAT + ": " + dataFormat() +"\n");
sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n");
sb.append(CONNECTIONS + ": " + numConnections() +"\n");
sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n");
@@ -207,7 +170,6 @@ public class VespaConfiguration {
sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n");
sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n");
sb.append(NUM_RETRIES + ": " + numRetries() +"\n");
- sb.append(USE_LEGACY_CLIENT + ": " + useLegacyClient().map(Object::toString).orElse("<empty>") +"\n");
return sb.toString();
}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
index f690e767194..39b24799002 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -24,35 +24,24 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
public class VespaStorageTest {
@Test
- public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.set(VespaConfiguration.DATA_FORMAT, "xml");
- assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf);
- }
-
-
- @Test
public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_operations.pig");
}
-
@Test
public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig");
}
-
@Test
public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
}
@Test
- public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception {
+ public void requireThatPremadeOperationsWithJsonLoaderFeedWithSllSucceeds() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString());
- conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString());
assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf);
}
@@ -61,19 +50,16 @@ public class VespaStorageTest {
assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
}
-
@Test
public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig");
}
-
@Test
public void requireThatFeedVisitDataSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
}
-
private PigServer setup(String script, Configuration conf) throws Exception {
if (conf == null) {
conf = new HdfsConfiguration();
@@ -92,12 +78,10 @@ public class VespaStorageTest {
return ps;
}
-
private void assertAllDocumentsOk(String script) throws Exception {
assertAllDocumentsOk(script, null);
}
-
private void assertAllDocumentsOk(String script, Configuration conf) throws Exception {
PigServer ps = setup(script, conf);
List<ExecJob> jobs = ps.executeBatch();