org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized

Apache's JIRA Issue Tracker | yangping wu | 2 years ago
  1. 0

    I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory ssc } val struct = StructType(StructField("log", StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset", functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd => { val result = rdd.map(item => { println(item) val result = item._2 match { case e: String => Row.apply(e) case _ => Row.apply("") } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, "test", overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} Not sure if this is a bug or a feature, but it's not obvious, so wanted to create a JIRA to make sure we document this behavior.Is someone can help me to see the reasons? Thank you.

    Apache's JIRA Issue Tracker | 2 years ago | yangping wu
    org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized
  2. 0

    Error in starting Spark streaming context

    Stack Overflow | 1 year ago | Sadaf
    org.apache.spark.SparkException: org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been initialized
  3. 0

    Apache Spark Developers List - DStream not initialized SparkException

    nabble.com | 4 months ago
    org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@15ce2c0 has not been initialized
  4. Speed up your debug routine!

    Automated exception search integrated into your IDE

  5. 0

    Apache Spark Developers List - DStream not initialized SparkException

    nabble.com | 4 months ago
    org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@15ce2c0 has not been initialized
  6. 0

    Apache Spark - ReducedWindowedDStream has not been initialized

    Stack Overflow | 1 year ago | alarius
    org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ReducedWindowedDStream@65600fb3 has not been initialized

  1. tyson925 3 times, last 7 months ago
3 unregistered visitors
Not finding the right solution?
Take a tour to get the most out of Samebug.

Tired of useless tips?

Automated exception search integrated into your IDE

Root Cause Analysis

  1. org.apache.spark.SparkException

    org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized

    at org.apache.spark.streaming.dstream.DStream.isTimeValid()
  2. Spark Project Streaming
    DStream$$anonfun$getOrCompute$1.apply
    1. org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
    2. org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
    3. org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
    4. org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
    4 frames
  3. Scala
    Option.orElse
    1. scala.Option.orElse(Option.scala:257)
    1 frame
  4. Spark Project Streaming
    DStreamGraph$$anonfun$1.apply
    1. org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
    2. org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    3. org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
    4. org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
    4 frames
  5. Scala
    AbstractTraversable.flatMap
    1. scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    2. scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    3. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    4. scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    5. scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    6. scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    6 frames
  6. Spark Project Streaming
    JobGenerator$$anonfun$restart$4.apply
    1. org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
    2. org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
    3. org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
    3 frames
  7. Scala
    ArrayOps$ofRef.foreach
    1. scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    2. scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    2 frames
  8. Spark Project Streaming
    StreamingContext.start
    1. org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
    2. org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
    3. org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
    4. org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
    4 frames
  9. logstatstreaming
    UserChannelTodb.main
    1. logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
    2. logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
    2 frames
  10. Java RT
    Method.invoke
    1. sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    2. sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    3. sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    4. java.lang.reflect.Method.invoke(Method.java:597)
    4 frames
  11. Spark
    SparkSubmit.main
    1. org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    2. org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    3. org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    4. org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    5. org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    5 frames