Issue
I'm attempting to upgrade the Apache Beam libraries from v2.19.0 to v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change that I'd appreciate some support with. Sorry this is quite a long one, I wanted to capture as much context as I could, but please shout if there's anything you'd like to dig into.
I'm using Beam inside GCP Dataflow to read data from BigQuery, then processing aggregates before I write the results back to BigQuery. I'm able to read from/write to BigQuery without issue, but after the upgrade my pipeline to calculate aggregates is failing at runtime, specifically a DoFn
I have written to sanitise the results returned from the Beam SqlTransform.query
command. I call this function within ParDo.of
to detect Double.MAX_VALUE
and Double.MIN_VALUE
values, as calling MIN/MAX aggregates in Beam SQL returns the Double min/max values when it encounters a NULL
value, rather than just returning NULL. I did try filtering the initial BigQuery raw data results, but this issue creeps in at the Beam SQL level.
There may be better ways to do this (I'm open to suggestions!). I've included a bunch of code snippets from my pipeline that I've tried to simplify, so apologies if there's anything obviously janky. Here's what I previously had before the library upgrade:
PCollection<Row> aggregates = inputCollection.apply(
"Generate Aggregates",
SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
)
.apply(ParDo.of(new HandleNullValues()));
I've included the HandleNullValues
definition at the bottom of this post, but it appears v2.21.0 introduced a breaking change whereby the coder inference was disabled for Beam Row types in this ticket. This change has caused the above code to fail with the following runtime error:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. Unable to return a default Coder for ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output [PCollection@83398426]. Correct one of the following root causes: [ERROR] No Coder has been manually specified; you may do so using .setCoder(). [ERROR] Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for a Beam Row. Please provide a schema instead using PCollection.setRowSchema. [ERROR] Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
I've followed the advice on the aforementioned JIRA ticket, plus a bunch of other examples I found online, but without much joy. I've tried applying setCoder(SerializableCoder.of(Row.class))
after the .apply(ParDo.of(new HandleNullValues()))
which fixes this error (though I'm not yet sure if it's just suppressed the error, or if it's actually working), but that changes causes another runtime error:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. Cannot call getSchema when there is no schema -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. Cannot call getSchema when there is no schema
This error is thrown further down my pipeline, when I perform a subsequent SqlTransform.query
to JOIN some results together.
PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
.and(new TupleTag<Row>("Experiments"), experiments)
.apply("Joining Aggregates to Experiments", SqlTransform.query(aggregateExperimentJoin()))
.apply(ParDo.of(new MapBeamRowsToBigQueryTableRows()))
.apply(BigQueryIO.writeTableRows()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String, String>) projectId -> projectId + ":daily_aggregates.experiments")));
I've verified the aggregates
collection is indeed missing a schema if I interrogate the hasSchema
property. The second experiments
PCollection above does have a row schema set though:
PCollection<Row> rawExperiments = rows.apply(
SqlTransform.query("select sessionId, experiments from PCOLLECTION")
);
PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new CustomFunctions.ParseExperiments(bigQuerySchema)));
experiments.setRowSchema(bigQuerySchema);
I've also tried applying this coder at the pipeline level, with different variations on the following. But this also gives the same error:
CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
cr.registerCoderForType(TypeDescriptors.rows(), RowCoder.of(bigQuerySchema));
The bigQuerySchema
object referenced above is the initial schema used to retrieve all raw data from BigQuery, though that part of the pipeline works fine, so potentially I need to pass the aggregatesSchema
object (see below) in to registerCoderForType
for the pipeline?
I then tried to set the row schema on aggregates
(which was another suggestion in the error above). I've confirmed that calling setCoder
is responsible for the previous Row
schema disappearing, where it had previously been set by the input PCollection (and also if I call setRowSchema
immediately before I call the DoFn
.
I've simplified the schema for succinctness in this post, but it's a subset of bigQuerySchema
with a few new fields (simple data types). Here's what I've tried, again with various combinations of where I call setCoder
and setRowSchema
(before apply()
and/or after).
Schema aggregatesSchema = Schema.builder()
.addNullableField("userId", FieldType.STRING)
.addNullableField("sessionId", FieldType.STRING)
.addNullableField("experimentsPerDay", FieldType.INT64)
.build();
PCollection<Row> aggregates = inputCollection.apply(
"Generate Aggregates",
SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
)
.apply(ParDo.of(new HandleNullValues()))
.setCoder(SerializableCoder.of(Row.class))
.setRowSchema(aggregatesSchema);
Unfortunately, this causes a third runtime error which I've not been able to figure out:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException
The full call stack is at the bottom of this email, and I can see it originating from my HandleNullValues
DoFn
, but after that it disappears into the Beam libraries.
I'm at a loss as to which route is recommended, and how to proceed, as both coder and schema options are causing different issues.
Any help would be greatly appreciated, and thanks in advance!
The full DoFn
I've referred to is further below, but it's worth noting that just having an essentially empty DoFn
with both input and output of Beam Row
types causes the same issue:
public static class HandleNullValues extends DoFn<Row, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
Row row = c.element();
c.output(row);
}
}
Here's the full implementation, if anyone can think of a better way to detect and replace NULL
values returned from Beam SQL:
public static class HandleNullValues extends DoFn<Row, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
Row row = c.element();
List<String> fields = row.getSchema().getFieldNames();
Builder rowBuilder = Row.withSchema(row.getSchema());
for (String f: fields) {
Object value = row.getValue(f);
if (value != null && value instanceof Long) {
Long longVal = row.getInt64(f);
if (longVal == Long.MAX_VALUE || longVal == Long.MIN_VALUE) {
rowBuilder.addValue(null);
} else {
rowBuilder.addValue(value);
}
} else if (value != null && value instanceof Double) {
Double doubleVal = row.getDouble(f);
if (doubleVal == Double.MAX_VALUE || doubleVal == Double.MIN_VALUE) {
rowBuilder.addValue(null);
} else {
rowBuilder.addValue(value);
}
} else {
rowBuilder.addValue(value);
}
}
Row newRow = rowBuilder.build();
c.output(newRow);
}
}
And here's the full callstack from the setRowSchema
issue detailed above:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:306) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282) at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406) at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.maven.plugin.MojoExecutionException: An exception occured while executing the Java class. java.lang.IllegalStateException at org.codehaus.mojo.exec.ExecJavaMojo.execute (ExecJavaMojo.java:311) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137) at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:301) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165) at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294) at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192) at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105) at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960) at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293) at org.apache.maven.cli.MavenCli.main (MavenCli.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282) at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406) at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:373) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:341) at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:218) at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309) at com.example.dataflow.Pipeline.main (Pipeline.java:284) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:748) Caused by: java.lang.IllegalStateException at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState (Preconditions.java:491) at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate (RowCoderGenerator.java:314) at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode (Unknown Source) at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode (Unknown Source) at org.apache.beam.sdk.schemas.SchemaCoder.encode (SchemaCoder.java:124) at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (CoderUtils.java:85) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:69) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:54) at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector. (MutationDetectors.java:118) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49) at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:268) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:416) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:404) at com.example.dataflow.Pipeline$HandleNullValues.processElement (CustomFunctions.java:310)
Cheers!
Solution
For anyone wanting to know how I fixed this, there are several updates:
- Apache Beam v2.20+ introduced breaking changes to inferred schema and coder, meaning
setRowSchema(x)
must now be declared immediately after applying aParDo
function - The
HandleNullValues
function above is no longer needed, as a bug fix was issued for this erroneous behaviour in Apache Beam v2.29.0 withNULL
values being returned for empty aggregates - I did not have to override a
coder
myself, validatinggetSchema
at each stage of the pipeline and setting it when required, was all that I needed to check
I'd thoroughly recommend signing-up to the Apache Beam mailing list if you require help, they've been great.
Answered By - Jimmy
Answer Checked By - Marilyn (JavaFixing Volunteer)