python - How to make the first first row as the header when reading a file in PySpark and coverting it to Pandas Dataframe -


i reading file in pyspark , forming rdd of it. convert normal dataframe , pandas dataframe. issue having there header row in input file , want make header of dataframe columns read in additional row , not header. current code:

def extract(line):     return line   input_file = sc.textfile('file1.txt').zipwithindex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)  input_data = (input_file     .map(lambda line: line.split(";"))     .filter(lambda line: len(line) >=0 )     .map(extract)) # map tuples  df_normal = input_data.todf() df= df_normal.topandas() 

now when @ df header row of text file becomes first row of dataframe , there additional header in df 0,1,2... header. how can make first row header?

there couple of ways that, depending on exact structure of data. since not give details, i'll try show using datafile nyctaxicab.csv can download.

if file in csv format, should use relevant spark-csv package, provided databricks. no need download explicitly, run pyspark follows:

$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0 

and then

>>> pyspark.sql import sqlcontext >>> pyspark.sql.types import * >>> sqlcontext = sqlcontext(sc)  >>> df = sqlcontext.read.load('file:///home/vagrant/data/nyctaxisub.csv',                        format='com.databricks.spark.csv',                        header='true',                        inferschema='true')  >>> df.count() 249999 

the file has 250,000 rows including header, 249,999 correct number of actual records. here schema, inferred automatically package:

>>> df.dtypes [('_id', 'string'),  ('_rev', 'string'),  ('dropoff_datetime', 'string'),  ('dropoff_latitude', 'double'),  ('dropoff_longitude', 'double'),  ('hack_license', 'string'),  ('medallion', 'string'),  ('passenger_count', 'int'),  ('pickup_datetime', 'string'),  ('pickup_latitude', 'double'),  ('pickup_longitude', 'double'),  ('rate_code', 'int'),  ('store_and_fwd_flag', 'string'),  ('trip_distance', 'double'),  ('trip_time_in_secs', 'int'),  ('vendor_id', 'string')] 

you can see more details in relevant blog post.

if, whatever reason, cannot use spark-csv package, you'll have subtract first row data , use construct schema. here general idea, , can again find full example code details in another blog post of mine:

>>> taxifile = sc.textfile("file:///home/ctsats/datasets/bdu_spark/nyctaxisub.csv") >>> taxifile.count() 250000 >>> taxifile.take(5) [u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',  u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000e+001,-7.40144200000000e+001,"a93d1f7f8998ffb75eef477eb6077516","68bc16a99e915e44ada7e639b4dd5f59",2,"2013-01-11 21:48:00",+4.06760670000000e+001,-7.39810790000000e+001,1,,+4.08000000000000e+000,900,"vts"',  u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000e+001,-7.39467470000000e+001,"64ce1b03fde343bb8dfb512123a525a4","60150aa39b2f654ed6f0c3af8174a48a",1,"2013-01-11 04:07:00",+4.07280540000000e+001,-7.40020370000000e+001,1,,+8.53000000000000e+000,1260,"vts"',  u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000e+001,-7.39942860000000e+001,"2d73b0c44f1699c67ab8ae322433bdb7","6f907bc9a85b7034c8418a24a0a75489",5,"2013-01-11 21:46:00",+4.07577480000000e+001,-7.39649810000000e+001,1,,+3.01000000000000e+000,960,"vts"',  u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000e+001,-7.39544600000000e+001,"e90018250f0a009433f03bd1e4a4ce53","1affd48cc07161da651625b562fe4d06",5,"2013-01-11 09:44:00",+4.07308080000000e+001,-7.39928280000000e+001,1,,+3.64000000000000e+000,1140,"vts"']  # construct schema header  >>> header = taxifile.first() >>> header u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"' >>> schemastring = header.replace('"','')  # rid of double-quotes >>> schemastring u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id' >>> fields = [structfield(field_name, stringtype(), true) field_name in schemastring.split(',')] >>> schema = structtype(fields)  # subtract header , use above-constructed schema: >>> taxiheader = taxifile.filter(lambda l: "_id" in l) # taxiheader needs rdd - string constructed above not job >>> taxiheader.collect() # inspection purposes [u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'] >>> taxinoheader = taxifile.subtract(taxiheader) >>> taxi_df = taxinoheader.todf(schema)  # spark dataframe >>> import pandas pd >>> taxi_df = taxi_df.topandas()  # pandas dataframe  

for brevity, here columns end being of type string, in blog post show in detail , explain how can further refine desired data types (and names) specific fields.


Comments

Popular posts from this blog

get url and add instance to a model with prefilled foreign key :django admin -

css - Make div keyboard-scrollable in jQuery Mobile? -

ruby on rails - Seeing duplicate requests handled with Unicorn -