scala - filter spark dataframe with row field that is an array of strings -
using spark 1.5 , scala 2.10.6
i'm trying filter dataframe via field "tags" array of strings. looking rows have tag 'private'.
val report = df.select("*") .where(df("tags").contains("private")) getting:
exception in thread "main" org.apache.spark.sql.analysisexception: cannot resolve 'contains(tags, private)' due data type mismatch: argument 1 requires string type, however, 'tags' of array type.;
is filter method better suited?
updated:
the data coming cassandra adapter minimal example shows i'm trying , gets above error is:
def testdata (sc: sparkcontext): dataframe = { val stringrdd = sc.parallelize(seq(""" { "name": "ed", "tags": ["red", "private"] }""", """{ "name": "fred", "tags": ["public", "blue"] }""") ) val sqlcontext = new org.apache.spark.sql.sqlcontext(sc) import sqlcontext.implicits._ sqlcontext.read.json(stringrdd) } def run(sc: sparkcontext) { val df1 = testdata(sc) df1.show() val report = df1.select("*") .where(df1("tags").contains("private")) report.show() } updated: tags array can length , 'private' tag can in position
updated: 1 solution works: udf
val filterpriv = udf {(tags: mutable.wrappedarray[string]) => tags.contains("private")} val report = df1.filter(filterpriv(df1("tags")))
i think if use where(array_contains(...)) work. here's result:
scala> import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext scala> import org.apache.spark.sql.dataframe import org.apache.spark.sql.dataframe scala> def testdata (sc: sparkcontext): dataframe = { | val stringrdd = sc.parallelize(seq | ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""", | """{ "name": "albert", "tags": ["private", "lumpy"] }""", | """{ "name": "zed", "tags": ["big", "private", "square"] }""", | """{ "name": "jed", "tags": ["green", "small", "round"] }""", | """{ "name": "ed", "tags": ["red", "private"] }""", | """{ "name": "fred", "tags": ["public", "blue"] }""")) | val sqlcontext = new org.apache.spark.sql.sqlcontext(sc) | import sqlcontext.implicits._ | sqlcontext.read.json(stringrdd) | } testdata: (sc: org.apache.spark.sparkcontext)org.apache.spark.sql.dataframe scala> | val df = testdata (sc) df: org.apache.spark.sql.dataframe = [name: string, tags: array<string>] scala> val report = df.select ("*").where (array_contains (df("tags"), "private")) report: org.apache.spark.sql.dataframe = [name: string, tags: array<string>] scala> report.show +------+--------------------+ | name| tags| +------+--------------------+ | ned|[blue, big, private]| |albert| [private, lumpy]| | zed|[big, private, sq...| | ed| [red, private]| +------+--------------------+ note works if write where(array_contains(df("tags"), "private")), if write where(df("tags").array_contains("private")) (more directly analogous wrote originally) fails array_contains not member of org.apache.spark.sql.column. looking @ source code column, see there's stuff handle contains (constructing contains instance that) not array_contains. maybe that's oversight.
Comments
Post a Comment