scala - filter spark dataframe with row field that is an array of strings -


using spark 1.5 , scala 2.10.6

i'm trying filter dataframe via field "tags" array of strings. looking rows have tag 'private'.

val report = df.select("*")   .where(df("tags").contains("private")) 

getting:

exception in thread "main" org.apache.spark.sql.analysisexception: cannot resolve 'contains(tags, private)' due data type mismatch: argument 1 requires string type, however, 'tags' of array type.;

is filter method better suited?

updated:

the data coming cassandra adapter minimal example shows i'm trying , gets above error is:

  def testdata (sc: sparkcontext): dataframe = {     val stringrdd = sc.parallelize(seq("""       { "name": "ed",         "tags": ["red", "private"]       }""",       """{ "name": "fred",         "tags": ["public", "blue"]       }""")     )     val sqlcontext = new org.apache.spark.sql.sqlcontext(sc)     import sqlcontext.implicits._     sqlcontext.read.json(stringrdd)   }   def run(sc: sparkcontext) {     val df1 = testdata(sc)     df1.show()     val report = df1.select("*")       .where(df1("tags").contains("private"))     report.show()   } 

updated: tags array can length , 'private' tag can in position

updated: 1 solution works: udf

val filterpriv = udf {(tags: mutable.wrappedarray[string]) => tags.contains("private")} val report = df1.filter(filterpriv(df1("tags"))) 

i think if use where(array_contains(...)) work. here's result:

scala> import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext  scala> import org.apache.spark.sql.dataframe import org.apache.spark.sql.dataframe  scala> def testdata (sc: sparkcontext): dataframe = {      |     val stringrdd = sc.parallelize(seq      |      ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""",      |       """{ "name": "albert", "tags": ["private", "lumpy"] }""",      |       """{ "name": "zed", "tags": ["big", "private", "square"] }""",      |       """{ "name": "jed", "tags": ["green", "small", "round"] }""",      |       """{ "name": "ed", "tags": ["red", "private"] }""",      |       """{ "name": "fred", "tags": ["public", "blue"] }"""))      |     val sqlcontext = new org.apache.spark.sql.sqlcontext(sc)      |     import sqlcontext.implicits._      |     sqlcontext.read.json(stringrdd)      |   } testdata: (sc: org.apache.spark.sparkcontext)org.apache.spark.sql.dataframe  scala>         | val df = testdata (sc) df: org.apache.spark.sql.dataframe = [name: string, tags: array<string>]  scala> val report = df.select ("*").where (array_contains (df("tags"), "private")) report: org.apache.spark.sql.dataframe = [name: string, tags: array<string>]  scala> report.show +------+--------------------+ |  name|                tags| +------+--------------------+ |   ned|[blue, big, private]| |albert|    [private, lumpy]| |   zed|[big, private, sq...| |    ed|      [red, private]| +------+--------------------+ 

note works if write where(array_contains(df("tags"), "private")), if write where(df("tags").array_contains("private")) (more directly analogous wrote originally) fails array_contains not member of org.apache.spark.sql.column. looking @ source code column, see there's stuff handle contains (constructing contains instance that) not array_contains. maybe that's oversight.


Comments

Popular posts from this blog

get url and add instance to a model with prefilled foreign key :django admin -

android - Keyboard hides my half of edit-text and button below it even in scroll view -

css - Make div keyboard-scrollable in jQuery Mobile? -