Dataframe
Dataframe is the RDD with schema. It stores structured data.
Transformations
def select(col: String, cols: String*): Dataframe
// select a part of the dataframe and return it as a new one
def agg(expr: Column, expr: Column*): Dataframe
// aggregate columns and return a new dataframe
def groupBy(col: String, cols: String*): Dataframe
// group the dataframe on columns
// It's intended to be used before agg.
def join(df: Dataframe): Dataframe
// inner join with another dataframe
Column
There are 3 ways to work with columns.
-
$
notationimport spark.implicits._ df.filter($"age" > 17)
-
dataframe referring
df.filter(df("age") > 17)
-
sql string (not recommanded)
df.filter("age > 17")
Basic query
val sydneyPeopleDF = peopleDF
.select("id", "name")
.where($"city" == "Sydney") // where and filter are the same.
.orderBy("id")
val sydneyAdultsDF = peopleDF
.select("id", "name")
.filter(($"city" == "Sydney") && ($"age" > 17))
.orderBy("id")
Grouping and Aggregating
val mostExpensiveDF = itemsDF
.groupBy($"name")
.max("price")
val ranksDF = postsDF
.groupBy($"authorId", $"topicId")
.agg(count($"authorId")) // return new dataframe whose columns are authorId, topicId, count(authorId)
.orderBy($"topicId", $"count(authorId)".desc) // order by, first, topicId, then count(authorId) in desc.
Clean data
drop the data
-
drop()
drop the row if it has any
null
orNaN
. -
drop("all")
drop the row if all its items are
null
orNaN
. -
drop(Array("id"))
drop the row if its
id
isnull
orNaN
.
replace the data
-
fill(0)
replace all
null
orNaN
by0
-
fill(Map("balance" -> 0))
replace
null
orNaN
of balance by0
-
replace(Array("id"), Map("1" -> "001"))
replace
1
in the columnid
by001