python - How to make the first first row as the header when reading a file in PySpark and coverting it to Pandas Dataframe -
i reading file in pyspark
, forming rdd
of it. convert normal dataframe
, pandas dataframe
. issue having there header row in input file , want make header of dataframe columns read in additional row , not header. current code:
def extract(line): return line input_file = sc.textfile('file1.txt').zipwithindex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line) input_data = (input_file .map(lambda line: line.split(";")) .filter(lambda line: len(line) >=0 ) .map(extract)) # map tuples df_normal = input_data.todf() df= df_normal.topandas()
now when @ df
header row of text file becomes first row of dataframe
, there additional header in df
0,1,2...
header. how can make first row header?
there couple of ways that, depending on exact structure of data. since not give details, i'll try show using datafile nyctaxicab.csv
can download.
if file in csv
format, should use relevant spark-csv
package, provided databricks. no need download explicitly, run pyspark
follows:
$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0
and then
>>> pyspark.sql import sqlcontext >>> pyspark.sql.types import * >>> sqlcontext = sqlcontext(sc) >>> df = sqlcontext.read.load('file:///home/vagrant/data/nyctaxisub.csv', format='com.databricks.spark.csv', header='true', inferschema='true') >>> df.count() 249999
the file has 250,000 rows including header, 249,999 correct number of actual records. here schema, inferred automatically package:
>>> df.dtypes [('_id', 'string'), ('_rev', 'string'), ('dropoff_datetime', 'string'), ('dropoff_latitude', 'double'), ('dropoff_longitude', 'double'), ('hack_license', 'string'), ('medallion', 'string'), ('passenger_count', 'int'), ('pickup_datetime', 'string'), ('pickup_latitude', 'double'), ('pickup_longitude', 'double'), ('rate_code', 'int'), ('store_and_fwd_flag', 'string'), ('trip_distance', 'double'), ('trip_time_in_secs', 'int'), ('vendor_id', 'string')]
you can see more details in relevant blog post.
if, whatever reason, cannot use spark-csv
package, you'll have subtract first row data , use construct schema. here general idea, , can again find full example code details in another blog post of mine:
>>> taxifile = sc.textfile("file:///home/ctsats/datasets/bdu_spark/nyctaxisub.csv") >>> taxifile.count() 250000 >>> taxifile.take(5) [u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"', u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000e+001,-7.40144200000000e+001,"a93d1f7f8998ffb75eef477eb6077516","68bc16a99e915e44ada7e639b4dd5f59",2,"2013-01-11 21:48:00",+4.06760670000000e+001,-7.39810790000000e+001,1,,+4.08000000000000e+000,900,"vts"', u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000e+001,-7.39467470000000e+001,"64ce1b03fde343bb8dfb512123a525a4","60150aa39b2f654ed6f0c3af8174a48a",1,"2013-01-11 04:07:00",+4.07280540000000e+001,-7.40020370000000e+001,1,,+8.53000000000000e+000,1260,"vts"', u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000e+001,-7.39942860000000e+001,"2d73b0c44f1699c67ab8ae322433bdb7","6f907bc9a85b7034c8418a24a0a75489",5,"2013-01-11 21:46:00",+4.07577480000000e+001,-7.39649810000000e+001,1,,+3.01000000000000e+000,960,"vts"', u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000e+001,-7.39544600000000e+001,"e90018250f0a009433f03bd1e4a4ce53","1affd48cc07161da651625b562fe4d06",5,"2013-01-11 09:44:00",+4.07308080000000e+001,-7.39928280000000e+001,1,,+3.64000000000000e+000,1140,"vts"'] # construct schema header >>> header = taxifile.first() >>> header u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"' >>> schemastring = header.replace('"','') # rid of double-quotes >>> schemastring u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id' >>> fields = [structfield(field_name, stringtype(), true) field_name in schemastring.split(',')] >>> schema = structtype(fields) # subtract header , use above-constructed schema: >>> taxiheader = taxifile.filter(lambda l: "_id" in l) # taxiheader needs rdd - string constructed above not job >>> taxiheader.collect() # inspection purposes [u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'] >>> taxinoheader = taxifile.subtract(taxiheader) >>> taxi_df = taxinoheader.todf(schema) # spark dataframe >>> import pandas pd >>> taxi_df = taxi_df.topandas() # pandas dataframe
for brevity, here columns end being of type string
, in blog post show in detail , explain how can further refine desired data types (and names) specific fields.
Comments
Post a Comment