Pair RDD

In the world of large scale data processing and distributed computing, it's actually very common to operate on data in the form of key-value pairs.

In single node computing, we have the data structure for it. It's Map.

In Spark, we have pair RDD.

RDD[(K, V)]

Create a pair RDD

val rdd: RDD[Article] = ???

val pairRdd: RDD[(String, String)] =
  rdd.map(article => (article.title, article.texts))

Transformations

  • groupByKey

  • reduceByKey

  • mapValues

  • keys

  • join

  • leftOuterJoin/rightOuterJoin

groupByKey

Recall: groupBy in Scala collection is:

def groupBy[K](f: A => K): Map[K, Traversable[A]]

For example,

val ages = List(1, 10, 20, 40, 66)
val people = ages.groupBy { age =>
  if (age < 18) "child"
  else if (age >= 18 && age < 60) "adult"
  else "senior"
}

println(people)
// Map("child" -> List(1, 10), "adult" -> List(20, 40), "senior" -> List(66))

Now, let's see groupByKey of pair RDD.

def groupByKey(): RDD[(K, Iterable[A])]

For example,

// create a pair RDD
val eventsRDD = sc.parallelize(...).map(event => (event.name, event.budget))
val groupedRDD = eventsRDD.groupByKey()

groupedRDD.collect().foreach(println)
// (Adobe, CompactBuffer(1000))
// (Google, CompactBuffer(1000, 2500))
// ...

reduceByKey

It combines groupByKey and reducing. Calling this function is more efficient.

def reduceByKey(f: (V, V) => V): RDD[(K, V)]

For example,

val budgetsRDD = eventsRDD.reduceByKey(_ + _)

budgetsRDD.collect().foreach(println)
// (Adobe, 1000)
// (Google, 3500)
// ...

mapValue

It applies the function on the value of pair RDD.

def mapValues[U](f: V => U): RDD[(K, U)]
val doubleBudgetsRDD = eventsRDD.mapValues(_ * 2)

doubleBudgetsRDD.collect().foreach(println)
// (Adobe, 2000)
// (Google, 2000)
// (Google, 5000)
// ...

Join

There are 2 kinds:

  • inner join: join

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    
    val joinedRDD = leftRDD.join(rightRDD)
    

    The keys in joinedRDD must exist in both leftRDD and rightRDD.

  • outer join:

    • leftOuterJoin

      def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
      
      val joinedRDD = leftRDD.leftOuterJoin(rightRDD)
      

      The keys in joinedRDD must exist in both leftRDD.

    • rightOuterJoin

      def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
      
      val joinedRDD = rightOuterJoin.leftOuterJoin(rightRDD)
      

      The keys in joinedRDD must exist in both rightRDD.

Actions

  • countByKey

countByKey

It returns a map that stores the count of each key.

def countByKey(): Map[K, Long]

Exercise for transformations and actions

Calculate the average buget of each event.

val averageBudgetRDD = eventsRDD
  .mapValues(budget => (budget, 1))
  .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
  .mapValues((totalBudget, count) => totalBudget / count)