You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When running a DataFlow job that reads from a Kinesis stream like so:
importapache_beamasbeamimportloggingimportargparsefromapache_beam.options.pipeline_optionsimportPipelineOptionsfromapache_beam.options.pipeline_optionsimportSetupOptionsfromapache_beam.options.pipeline_optionsimportStandardOptionsfromapache_beam.io.kinesisimportInitialPositionInStream, ReadDataFromKinesisdefrun(argv=None):
parser=argparse.ArgumentParser()
parser.add_argument(
"--aws_access_key", dest="aws_access_key", help="AWS Access Key used to connect to Kinesis stream."
)
parser.add_argument(
"--aws_secret_key", dest="aws_secret_key", help="AWS Secret Key used to connect to Kinesis stream."
)
known_args, pipeline_args=parser.parse_known_args(argv)
pipeline_options=PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session=Truepipeline_options.view_as(StandardOptions).streaming=is_streaming# The pipeline will be run on exiting the with block.withbeam.Pipeline(options=pipeline_options) asp:
p|'ReadFromKinesis'>>ReadDataFromKinesis(
stream_name="EventsDataStream",
aws_access_key=known_args.aws_access_key,
aws_secret_key=known_args.aws_secret_key,
region="us-east-1",
initial_position_in_stream=InitialPositionInStream.LATEST
)
if__name__=="__main__":
logging.getLogger().setLevel(logging.INFO)
run()
I get the following error:
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-10-27T14:28:50.348Z: JOB_MESSAGE_ERROR: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:241)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:241)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
at org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:241)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:983)
at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:431)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
I'm not sure how to debug this further, but I believe I followed the docs and it should work? Let me know if I can supply a better reproduction case.
Issue Priority
Priority: 2
Issue Component
Component: io-py-aws
The text was updated successfully, but these errors were encountered:
What happened?
Environment:
When running a DataFlow job that reads from a Kinesis stream like so:
I get the following error:
I'm not sure how to debug this further, but I believe I followed the docs and it should work? Let me know if I can supply a better reproduction case.
Issue Priority
Priority: 2
Issue Component
Component: io-py-aws
The text was updated successfully, but these errors were encountered: