Create DataFrames in Spark using Scala

joydeep bhattacharjee
4 min readAug 15, 2018

--

Photo by Medhat Ayad from Pexels

I have recently started looking into spark and scala. Needlessly to say they are amazing. In any case in Scala you have the option to have your data as dataframes. Dataframes are a very popular format for representing data. For someone like me who comes from the world of python/pandas, I have become accustomed to thinking of data in the context of dataframes. (DataFrames were initially designed for developers who came from the SQL world)

Now what you will generally find is that in spark applications, data is generally processed in the RDD format and then they are saved in files in the parquet format. You may want to convert your datastructures to dataframes. In that case we are going to see two methods how we are going to do that.

Method 1:

In this case I have a file called Sentiment_analysis.csv. This file looks like this

➜  spark-scala-tutorial git:(master) ✗ head Sentiment_analysis.csv
ItemID,Sentiment,SentimentText
1,0, is so sad for my APL friend.............
2,0, I missed the New Moon trailer...
3,1, omg its already 7:30 :O
4,0, .. Omgaga. Im sooo im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...
5,0, i think mi bf is cheating on me!!! T_T
6,0, or i just worry too much?
7,1, Juuuuuuuuuuuuuuuuussssst Chillin!!
8,0, Sunny Again Work Tomorrow :-| TV Tonight
9,1, handed in my uniform today . i miss you already

First line are the headers: ItemID, Sentiment and the text. The remaining lines are the individual records. Similar to what you would see in a csv file.

Now the file can be read the first as a simple text file. Take the first line and then in the final file object will filter out the first header line so that only the data is present when converting it to a dataframe.

scala> val file = sc.textFile("Sentiment_analysis.csv")
file: org.apache.spark.rdd.RDD[String] = Sentiment_analysis.csv MapPartitionsRDD[5] at textFile at <console>:25
scala> val header = file.first()
header: String = ItemID,Sentiment,SentimentText
scala> val file1 = file.filter(row => row != header)
file1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:28

My file is now prepped and hence itshould be convert it to a dataframe. This is a bit tricky and hence you will need to do a couple of transformations. First split the records based on the delimiter which is the comma “,” here. The output is an array which you can assign to a, b, c. a and b are converted to integers using the toInt method. The output of this is finally passed to the toDF method to be converted into the final dataframe.

scala> val fileToDf = file1.map(_.split(",")).map{case Array(a,b,c) => (a.toInt, b.toInt, c)}.toDF("ItemID","Sentiment","SentimentText")
fileToDf: org.apache.spark.sql.DataFrame = [ItemID: int, Sentiment: int ... 1 more field]

I have my df as you can see below.

scala> fileToDf.show(5)
+------+---------+--------------------+
|ItemID|Sentiment| SentimentText|
+------+---------+--------------------+
| 1| 0| ...|
| 2| 0| ...|
| 3| 1| omg...|
| 4| 0| .. Omga...|
| 5| 0| i think ...|
+------+---------+--------------------+
only showing top 5 rows

You can now take this dataframe and implement you further processing and transformations.

Method 2

The second method which I feel is much cleaner is using the spark.read.format method. The advantage is that, you can define a data schema. This will work like a data validation when working with unknown data. To do that I will import the structure classes.

scala> import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType, IntegerType}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType, IntegerType}

Then we will go ahead and define the schema

scala> val schemaStruct = StructType(
| StructField("ItemID", IntegerType) ::
| StructField("Sentiment", IntegerType) ::
| StructField("SentimentText", StringType) :: Nil
| )
schemaStruct: org.apache.spark.sql.types.StructType = StructType(StructField(ItemID,IntegerType,true), StructField(Sentiment,IntegerType,true), StructField(SentimentText,StringType,true))

Once done I can now create my dataframe.

scala> val df = spark.read.format("csv").option("header", "true").schema(schemaStruct).load("Sentiment_analysis.csv")
df: org.apache.spark.sql.DataFrame = [ItemID: int, Sentiment: int ... 1 more field]

You should be able to write your dataframe transformations now.

scala> df.show(5)
+------+---------+--------------------+
|ItemID|Sentiment| SentimentText|
+------+---------+--------------------+
| 1| 0| ...|
| 2| 0| ...|
| 3| 1| omg...|
| 4| 0| .. Omga...|
| 5| 0| i think ...|
+------+---------+--------------------+
only showing top 5 rows

This is it. Please hit the clap button in case you liked this post. Do let me know in the comments which method you liked better.

I have recently completed a book on fastText. FastText is a cool library opensourced by Facebook for efficient text classification and creating the word embeddings. Do check out the book.

--

--

joydeep bhattacharjee
joydeep bhattacharjee

Written by joydeep bhattacharjee

machine learning engineer and avid reader

Responses (1)