java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: environment,timeslicesecond

DataStax JIRA | JV | 1 year ago
tip
Click on the to mark the solution that helps you, Samebug will learn from it.
As a community member, you’ll be rewarded for you help.
  1. 0

    Discussion group post: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/_PGqIg79CiQ I'm getting an exception (java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns:) when querying a table on a field that is both indexed and part of the primary key. It looks like the connector, before it pushes the filter down to C*, is validating the query, but missing the fact that the field is indexed. The connector goes through 2 step process to validate the query: first it checks if the field is indexed, then it checks if the field is part of the key to ensure all fields part of the key are specified. Even if I try to specify all the key fields, it fails in the second validation because the connector identifies the field as indexed in the first step and does not let the other fields of the key in the where clause go through the second validation. Any help is appreciated. Stack trace and sample code below. Connector version: 2.10_1.5.0-M1 Cassandra Version: 2.1.5 Stack Trace: {noformat} Exception in thread "main" java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: environment,timeslicesecond at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.containsPartitionKey(CassandraRDDPartitioner.scala:112) at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.partitions(CassandraRDDPartitioner.scala:130) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:145) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:193) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) {noformat} Code to reproduce the use case: {noformat} val sc = new SparkContext(conf); val cc = new CassandraSQLContext(sc) val sqlContext = cc.asInstanceOf[SQLContext] import sqlContext.implicits._ val cassandraConnector = CassandraConnector(conf); cassandraConnector.withSessionDo { session => session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(""" CREATE TABLE IF NOT EXISTS test.test_table ( -- key gid TEXT, environment TEXT, timesliceSecond BIGINT, runId BIGINT, seqId BIGINT, notes TEXT, PRIMARY KEY ( ( gid, environment, timesliceSecond ), runId, seqId) ) WITH CLUSTERING ORDER BY (runId DESC, seqId DESC) """) session.execute("CREATE INDEX IF NOT EXISTS idx_runId_test_table on test.test_table(runId)"); session.execute("CREATE INDEX IF NOT EXISTS idx_gid_test_table on test.test_table(gid)"); } val gidRdd = cc.cassandraSql(s"SELECT * FROM test.test_table WHERE gid = 'AAB'") gidRdd.explain() gidRdd.show() {noformat}

    DataStax JIRA | 1 year ago | JV
    java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: environment,timeslicesecond
  2. 0

    Discussion group post: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/_PGqIg79CiQ I'm getting an exception (java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns:) when querying a table on a field that is both indexed and part of the primary key. It looks like the connector, before it pushes the filter down to C*, is validating the query, but missing the fact that the field is indexed. The connector goes through 2 step process to validate the query: first it checks if the field is indexed, then it checks if the field is part of the key to ensure all fields part of the key are specified. Even if I try to specify all the key fields, it fails in the second validation because the connector identifies the field as indexed in the first step and does not let the other fields of the key in the where clause go through the second validation. Any help is appreciated. Stack trace and sample code below. Connector version: 2.10_1.5.0-M1 Cassandra Version: 2.1.5 Stack Trace: {noformat} Exception in thread "main" java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: environment,timeslicesecond at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.containsPartitionKey(CassandraRDDPartitioner.scala:112) at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.partitions(CassandraRDDPartitioner.scala:130) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:145) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:193) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) {noformat} Code to reproduce the use case: {noformat} val sc = new SparkContext(conf); val cc = new CassandraSQLContext(sc) val sqlContext = cc.asInstanceOf[SQLContext] import sqlContext.implicits._ val cassandraConnector = CassandraConnector(conf); cassandraConnector.withSessionDo { session => session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute(""" CREATE TABLE IF NOT EXISTS test.test_table ( -- key gid TEXT, environment TEXT, timesliceSecond BIGINT, runId BIGINT, seqId BIGINT, notes TEXT, PRIMARY KEY ( ( gid, environment, timesliceSecond ), runId, seqId) ) WITH CLUSTERING ORDER BY (runId DESC, seqId DESC) """) session.execute("CREATE INDEX IF NOT EXISTS idx_runId_test_table on test.test_table(runId)"); session.execute("CREATE INDEX IF NOT EXISTS idx_gid_test_table on test.test_table(gid)"); } val gidRdd = cc.cassandraSql(s"SELECT * FROM test.test_table WHERE gid = 'AAB'") gidRdd.explain() gidRdd.show() {noformat}

    DataStax JIRA | 1 year ago | JV
    java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: environment,timeslicesecond

    Root Cause Analysis

    1. java.lang.UnsupportedOperationException

      Partition key predicate must include all partition key columns. Missing columns: environment,timeslicesecond

      at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.containsPartitionKey()
    2. spark-cassandra-connector
      CassandraTableScanRDD.getPartitions
      1. com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.containsPartitionKey(CassandraRDDPartitioner.scala:112)
      2. com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.partitions(CassandraRDDPartitioner.scala:130)
      3. com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:145)
      3 frames
    3. Spark
      RDD$$anonfun$partitions$2.apply
      1. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
      2. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
      2 frames
    4. Scala
      Option.getOrElse
      1. scala.Option.getOrElse(Option.scala:120)
      1 frame
    5. Spark
      RDD$$anonfun$partitions$2.apply
      1. org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
      2. org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      3. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
      4. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
      4 frames
    6. Scala
      Option.getOrElse
      1. scala.Option.getOrElse(Option.scala:120)
      1 frame
    7. Spark
      RDD$$anonfun$partitions$2.apply
      1. org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
      2. org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      3. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
      4. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
      4 frames
    8. Scala
      Option.getOrElse
      1. scala.Option.getOrElse(Option.scala:120)
      1 frame
    9. Spark
      RDD$$anonfun$partitions$2.apply
      1. org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
      2. org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      3. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
      4. org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
      4 frames
    10. Scala
      Option.getOrElse
      1. scala.Option.getOrElse(Option.scala:120)
      1 frame
    11. Spark
      RDD.partitions
      1. org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
      1 frame
    12. Spark Project SQL
      DataFrame.show
      1. org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:193)
      2. org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
      3. org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
      4. org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
      5. org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
      6. org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
      7. org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
      8. org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
      9. org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
      10. org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
      11. org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
      12. org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
      13. org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
      13 frames