java.lang.NullPointerException

Apache's JIRA Issue Tracker | yangping wu | 2 years ago
tip
Do you know that we can give you better hits? Get more relevant results from Samebug’s stack trace search.
  1. 0

    I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory ssc } val struct = StructType(StructField("log", StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset", functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd => { val result = rdd.map(item => { println(item) val result = item._2 match { case e: String => Row.apply(e) case _ => Row.apply("") } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, "test", overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} Not sure if this is a bug or a feature, but it's not obvious, so wanted to create a JIRA to make sure we document this behavior.Is someone can help me to see the reasons? Thank you.

    Apache's JIRA Issue Tracker | 2 years ago | yangping wu
    java.lang.NullPointerException

    Root Cause Analysis

    1. java.lang.NullPointerException

      No message provided

      at org.apache.spark.sql.SQLConf.getConf()
    2. Spark Project SQL
      SQLContext.createDataFrame
      1. org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217)
      2. org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191)
      3. org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
      4. org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
      5. org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381)
      5 frames
    3. logstatstreaming
      FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply
      1. logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57)
      2. logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40)
      2 frames
    4. Spark Project Streaming
      ForEachDStream$$anonfun$1.apply
      1. org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
      2. org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
      3. org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
      4. org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
      5. org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
      5 frames
    5. Scala
      Try$.apply
      1. scala.util.Try$.apply(Try.scala:161)
      1 frame
    6. Spark Project Streaming
      JobScheduler$JobHandler$$anonfun$run$1.apply
      1. org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
      2. org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
      3. org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
      4. org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
      4 frames
    7. Scala
      DynamicVariable.withValue
      1. scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      1 frame
    8. Spark Project Streaming
      JobScheduler$JobHandler.run
      1. org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
      1 frame
    9. Java RT
      Thread.run
      1. java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      2. java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      3. java.lang.Thread.run(Thread.java:619)
      3 frames