summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2022-06-07 11:59:03 +0200
committergjoranv <gv@verizonmedia.com>2022-06-08 11:45:30 +0200
commit033d6494edc17b554ab841c3f5ea70bc5f8925de (patch)
treefc24f1564b91ee6e7009f4a92adb0981ffb92924 /vespa-hadoop
parent72e82db1739fd88a78aba7d55c7ee4ef7f953863 (diff)
Revert "Remove http client use"
This reverts commit a7fd13540d34de50ed3526576c62eebc476a1e1c.
Diffstat (limited to 'vespa-hadoop')
-rw-r--r--vespa-hadoop/pom.xml5
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java233
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java18
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java8
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java38
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java18
6 files changed, 318 insertions, 2 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
index 1dde2d58610..0c724c95839 100644
--- a/vespa-hadoop/pom.xml
+++ b/vespa-hadoop/pom.xml
@@ -136,6 +136,11 @@
<!-- Vespa feeding dependencies -->
<dependency>
<groupId>com.yahoo.vespa</groupId>
+ <artifactId>vespa-http-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
<artifactId>vespa-feed-client</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
new file mode 100644
index 00000000000..6900c7dc82f
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
@@ -0,0 +1,233 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.io.IOException;
+import java.io.StringReader;
+import java.time.Duration;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+/**
+ * {@link LegacyVespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-http-client.
+ *
+ * @author lesters
+ * @deprecated Replaced by {@link VespaRecordWriter}
+ */
+@Deprecated
+public class LegacyVespaRecordWriter extends RecordWriter<Object, Object> {
+
+ private final Logger log = Logger.getLogger(getClass().getCanonicalName());
+
+ private boolean initialized = false;
+ private com.yahoo.vespa.http.client.FeedClient feedClient;
+ private final VespaCounters counters;
+ private final int progressInterval;
+
+ final VespaConfiguration configuration;
+
+ LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ this.counters = counters;
+ this.configuration = configuration;
+ this.progressInterval = configuration.progressInterval();
+ }
+
+
+ @Override
+ public void write(Object key, Object data) throws IOException, InterruptedException {
+ if (!initialized) {
+ initialize();
+ }
+
+ String doc = data.toString().trim();
+
+ // Parse data to find document id - if none found, skip this write
+ String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
+ : findDocIdFromXml(doc);
+ if (docId != null && docId.length() >= 0) {
+ feedClient.stream(docId, doc);
+ counters.incrementDocumentsSent(1);
+ } else {
+ counters.incrementDocumentsSkipped(1);
+ }
+
+ if (counters.getDocumentsSent() % progressInterval == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+
+ }
+
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (feedClient != null) {
+ feedClient.close();
+ }
+ }
+
+ protected ConnectionParams.Builder configureConnectionParams() {
+ ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
+ connParamsBuilder.setDryRun(configuration.dryrun());
+ connParamsBuilder.setUseCompression(configuration.useCompression());
+ connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ connParamsBuilder.setMaxRetries(configuration.numRetries());
+ if (configuration.proxyHost() != null) {
+ connParamsBuilder.setProxyHost(configuration.proxyHost());
+ }
+ if (configuration.proxyPort() >= 0) {
+ connParamsBuilder.setProxyPort(configuration.proxyPort());
+ }
+ return connParamsBuilder;
+ }
+
+ protected FeedParams.Builder configureFeedParams() {
+ FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
+ feedParamsBuilder.setDataFormat(configuration.dataFormat());
+ feedParamsBuilder.setRoute(configuration.route());
+ feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
+ feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
+ feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis());
+ return feedParamsBuilder;
+ }
+
+ protected SessionParams.Builder configureSessionParams() {
+ SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder();
+ sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize());
+ sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2);
+ return sessionParamsBuilder;
+ }
+
+ private void initialize() {
+ if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) {
+ int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs());
+ log.info("VespaStorage: Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+
+ ConnectionParams.Builder connParamsBuilder = configureConnectionParams();
+ FeedParams.Builder feedParamsBuilder = configureFeedParams();
+ SessionParams.Builder sessionParams = configureSessionParams();
+
+ sessionParams.setConnectionParams(connParamsBuilder.build());
+ sessionParams.setFeedParams(feedParamsBuilder.build());
+
+ String endpoints = configuration.endpoint();
+ StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String endpoint = tokenizer.nextToken().trim();
+ sessionParams.addCluster(new Cluster.Builder().addEndpoint(
+ Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL().orElse(false))
+ ).build());
+ }
+
+ ResultCallback resultCallback = new ResultCallback(counters);
+ feedClient = com.yahoo.vespa.http.client.FeedClientFactory.create(sessionParams.build(), resultCallback);
+
+ initialized = true;
+ log.info("VespaStorage configuration:\n" + configuration.toString());
+ log.info(feedClient.getStatsAsJson());
+ }
+
+ private String findDocIdFromXml(String xml) {
+ try {
+ XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
+ while (eventReader.hasNext()) {
+ XMLEvent event = eventReader.nextEvent();
+ if (event.getEventType() == XMLEvent.START_ELEMENT) {
+ StartElement element = event.asStartElement();
+ String elementName = element.getName().getLocalPart();
+ if (VespaDocumentOperation.Operation.valid(elementName)) {
+ return element.getAttributeByName(QName.valueOf("documentid")).getValue();
+ }
+ }
+ }
+ } catch (XMLStreamException | FactoryConfigurationError e) {
+ // as json dude does
+ return null;
+ }
+ return null;
+ }
+
+ private String findDocId(String json) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try(JsonParser parser = factory.createParser(json)) {
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return null;
+ }
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ String fieldName = parser.getCurrentName();
+ parser.nextToken();
+ if (VespaDocumentOperation.Operation.valid(fieldName)) {
+ String docId = parser.getText();
+ return docId;
+ } else {
+ parser.skipChildren();
+ }
+ }
+ } catch (JsonParseException ex) {
+ return null;
+ }
+ return null;
+ }
+
+
+ class ResultCallback implements com.yahoo.vespa.http.client.FeedClient.ResultCallback {
+ final VespaCounters counters;
+
+ public ResultCallback(VespaCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void onCompletion(String docId, com.yahoo.vespa.http.client.Result documentResult) {
+ if (!documentResult.isSuccess()) {
+ counters.incrementDocumentsFailed(1);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Problems with docid ");
+ sb.append(docId);
+ sb.append(": ");
+ List<com.yahoo.vespa.http.client.Result.Detail> details = documentResult.getDetails();
+ for (com.yahoo.vespa.http.client.Result.Detail detail : details) {
+ sb.append(detail.toString());
+ sb.append(" ");
+ }
+ log.warning(sb.toString());
+ return;
+ }
+ counters.incrementDocumentsOk(1);
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
index 42eb6293eee..66ab94574d9 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -10,16 +10,24 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
+import java.util.Objects;
import java.util.Properties;
+import java.util.logging.Logger;
+
+import static com.yahoo.vespa.http.client.config.FeedParams.DataFormat.XML_UTF8;
/**
* An output specification for writing to Vespa instances in a Map-Reduce job.
+ * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the
+ * actual feeding to Vespa.
*
* @author lesters
*/
@SuppressWarnings("rawtypes")
public class VespaOutputFormat extends OutputFormat {
+ private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName());
+
final Properties configOverride;
public VespaOutputFormat() {
@@ -34,10 +42,18 @@ public class VespaOutputFormat extends OutputFormat {
@Override
+ @SuppressWarnings("deprecation")
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException {
VespaCounters counters = VespaCounters.get(context);
VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
- return new VespaRecordWriter(configuration, counters);
+ Boolean useLegacyClient = configuration.useLegacyClient().orElse(null);
+ if (Objects.equals(useLegacyClient, Boolean.TRUE) || configuration.dataFormat() == XML_UTF8) {
+ log.warning("Feeding with legacy client or XML will no longer be supported on Vespa 8. " +
+ "See https://docs.vespa.ai/en/vespa8-release-notes.html");
+ return new LegacyVespaRecordWriter(configuration, counters);
+ } else {
+ return new VespaRecordWriter(configuration, counters);
+ }
}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
index c450d7cdeef..6d6c3789835 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -7,6 +7,7 @@ import ai.vespa.feed.client.JsonFeeder;
import ai.vespa.feed.client.OperationParseException;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.http.client.config.FeedParams;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -84,11 +85,18 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> {
private void initializeOnFirstWrite() {
if (initialized) return;
+ validateConfig();
useRandomizedStartupDelayIfEnabled();
feeder = createJsonStreamFeeder();
initialized = true;
}
+ private void validateConfig() {
+ if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) {
+ throw new IllegalArgumentException("Only JSON is support by this feed client implementation");
+ }
+ }
+
private void useRandomizedStartupDelayIfEnabled() {
if (!config.dryrun() && config.randomStartupSleepMs() > 0) {
int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
index 54be261fbe7..715546fe6fe 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.mapreduce.util;
+import com.yahoo.vespa.http.client.config.FeedParams;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
@@ -18,6 +19,7 @@ public class VespaConfiguration {
public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme";
public static final String DRYRUN = "vespa.feed.dryrun";
public static final String USE_COMPRESSION = "vespa.feed.usecompression";
+ public static final String DATA_FORMAT = "vespa.feed.data.format";
public static final String PROGRESS_REPORT = "vespa.feed.progress.interval";
public static final String CONNECTIONS = "vespa.feed.connections";
public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
@@ -27,6 +29,7 @@ public class VespaConfiguration {
public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
public static final String NUM_RETRIES = "vespa.feed.num.retries";
+ public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient";
private final Configuration conf;
private final Properties override;
@@ -36,82 +39,115 @@ public class VespaConfiguration {
this.override = override;
}
+
public static VespaConfiguration get(Configuration conf, Properties override) {
return new VespaConfiguration(conf, override);
}
+
public String endpoint() {
return getString(ENDPOINT);
}
+
public int defaultPort() {
return getInt(DEFAULT_PORT, 4080);
}
+
public Optional<Boolean> useSSL() {
String raw = getString(USE_SSL);
if (raw == null || raw.trim().isEmpty()) return Optional.empty();
return Optional.of(Boolean.parseBoolean(raw));
}
+
public String proxyHost() {
return getString(PROXY_HOST);
}
+
public int proxyPort() {
return getInt(PROXY_PORT, 4080);
}
+
public String proxyScheme() {
String raw = getString(PROXY_SCHEME);
if (raw == null) return "http";
return raw;
}
+
public boolean dryrun() {
return getBoolean(DRYRUN, false);
}
+
public boolean useCompression() {
return getBoolean(USE_COMPRESSION, true);
}
+
public int numConnections() {
return getInt(CONNECTIONS, 1);
}
+
public int throttlerMinSize() {
return getInt(THROTTLER_MIN_SIZE, 0);
}
+
public int queryConnectionTimeout() {
return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
}
+
public String route() {
return getString(ROUTE);
}
+
public int maxSleepTimeMs() {
return getInt(MAX_SLEEP_TIME_MS, 10000);
}
+
public int maxInFlightRequests() {
return getInt(MAX_IN_FLIGHT_REQUESTS, 500);
}
+
public int randomStartupSleepMs() {
return getInt(RANDOM_STARTUP_SLEEP, 30000);
}
+
public int numRetries() {
return getInt(NUM_RETRIES, 100);
}
+
+ public FeedParams.DataFormat dataFormat() {
+ String format = getString(DATA_FORMAT);
+ if ("xml".equalsIgnoreCase(format)) {
+ return FeedParams.DataFormat.XML_UTF8;
+ }
+ return FeedParams.DataFormat.JSON_UTF8;
+ }
+
+
public int progressInterval() {
return getInt(PROGRESS_REPORT, 1000);
}
+ public Optional<Boolean> useLegacyClient() {
+ String raw = getString(USE_LEGACY_CLIENT);
+ if (raw == null || raw.trim().isEmpty()) return Optional.empty();
+ return Optional.of(Boolean.parseBoolean(raw));
+ }
+
public String getString(String name) {
if (override != null && override.containsKey(name)) {
return override.getProperty(name);
@@ -161,6 +197,7 @@ public class VespaConfiguration {
sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n");
sb.append(DRYRUN + ": " + dryrun() +"\n");
sb.append(USE_COMPRESSION + ": " + useCompression() +"\n");
+ sb.append(DATA_FORMAT + ": " + dataFormat() +"\n");
sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n");
sb.append(CONNECTIONS + ": " + numConnections() +"\n");
sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n");
@@ -170,6 +207,7 @@ public class VespaConfiguration {
sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n");
sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n");
sb.append(NUM_RETRIES + ": " + numRetries() +"\n");
+ sb.append(USE_LEGACY_CLIENT + ": " + useLegacyClient().map(Object::toString).orElse("<empty>") +"\n");
return sb.toString();
}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
index 39b24799002..f690e767194 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -24,24 +24,35 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
public class VespaStorageTest {
@Test
+ public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.DATA_FORMAT, "xml");
+ assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf);
+ }
+
+
+ @Test
public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_operations.pig");
}
+
@Test
public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig");
}
+
@Test
public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
}
@Test
- public void requireThatPremadeOperationsWithJsonLoaderFeedWithSllSucceeds() throws Exception {
+ public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString());
+ conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString());
assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf);
}
@@ -50,16 +61,19 @@ public class VespaStorageTest {
assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
}
+
@Test
public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig");
}
+
@Test
public void requireThatFeedVisitDataSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
}
+
private PigServer setup(String script, Configuration conf) throws Exception {
if (conf == null) {
conf = new HdfsConfiguration();
@@ -78,10 +92,12 @@ public class VespaStorageTest {
return ps;
}
+
private void assertAllDocumentsOk(String script) throws Exception {
assertAllDocumentsOk(script, null);
}
+
private void assertAllDocumentsOk(String script, Configuration conf) throws Exception {
PigServer ps = setup(script, conf);
List<ExecJob> jobs = ps.executeBatch();