top of page
  • Writer's pictureLeo Chashnikov

Testing Spark apps locally with Scalatest

Updated: Mar 25, 2023

Years ago I wrote a blog post describing process of building and deploying a simple Spark app. This post is now too old to be of any use for anyone (Spark 1.4 is used there!), and as I found myself working with Spark again, I decided to write an updated version. So in this post we'll take Scala 2.12, Spark 3.3 and Scalatest, implement and test a simple function, and in the next one we'll run Spark in Standalone Cluster mode, and run our application on it.


Setting up sbt project


Most natural part to start with is creating a new sbt project. I've decided to take Scala 2.12 - at the time of writing Spark recommends using it. From Spark we need two packages - spark-core and spark-sql (as we'll be using Dataset API). For testing we'll also take scalatest, leading to such build.sbt file:


ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.17"

lazy val root = (project in file("."))
  .settings(
    name := "sparkling",
    idePackagePrefix := Some("com.rayanral")
  )

libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.3.0"

libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.14" % "test"

Application code


Now that we have a new project, let's do something useful with it. As a first idea that came to my head (possibly because I've spent 5 hours playing WH40k Chaos Gate) I've decided to create a dataset of some Warhammer 40k units. To begin with, we'll create a simple case class, that will be used in our Dataset:


case class WarhammerUnit(
    faction: String,
    name: String,
    color: String, 
    movement: Int
)

First thing Warhammer fan does when he gets his hands on a new figure - he paints it. And some colors are obviously more preferrable then others - let's create a function to reflect that:



import org.apache.spark.sql._  // we're using Datasets

// passing SparkSession as a class field is a doubtful practice that might cause serialization issues, but it's sufficient for tests
class Accelerator(sparkSession: SparkSession) extends Serializable {

  // Dataset.map function needs an Encoder. This package allows us implicit Encoder creation for a case class
  import sparkSession.implicits._

  // any Ork will tell you that this is absolutely true
  def redGoezFasta(units: Dataset[WarhammerUnit]): Dataset[WarhammerUnit] = {
    units.map { unit =>
      unit.color match {
        case "Red" => unit.copy(movement = unit.movement + 5)
        case _ => unit
      }
    }
  }

}

That's all code we need for now. Next step - testing.


Testing with Scalatest


As we'll probably have much more than just one test class, let's prepare a nice little trait to make our life easier:


import org.apache.spark.sql._

trait SparkTester {

    val sparkSession: SparkSession = {
      SparkSession
        .builder()
        .master("local")
        .appName("test spark app")
        .getOrCreate()
    }
}

This bit should be quite self-explanatory - we'll use this trait in each test class that needs to test any Spark functions. Let's put it to good use.



import org.scalatest._
import flatspec._
import org.scalatest.matchers.should.Matchers._

class RedAcceleratorTest extends AnyFlatSpec with SparkTester {

  import sparkSession.implicits._

  "accelerator" should "increase movement of red vehicles" in {
    // create a small dataset for test
    val testData = Seq(
      WarhammerUnit("Orks", "Trukk", "Red", 12),
      WarhammerUnit("Orks", "Trukk", "Blue", 12),
      WarhammerUnit("Blood Angels", "Rhino", "Red", 12),
      WarhammerUnit("Adeptus Astartes", "Librarian", "Ultramarine", 6),
    )
   // and actually submit it to spark
    val testDf = sparkSession.createDataset(testData)
   
    // actual "business logic" of our application
    val accelerator = new Accelerator(sparkSession)
    val resultDf = accelerator.redGoezFasta(testDf)

    // size of output didn't change, we only updated movement
    resultDf.count() should be(4)
    val redUnits = resultDf.collect().toList.filter(_.color == "Red")
    // there were two red units in input data, and they both had movement=12
    redUnits should have size(2)
    every(redUnits.map(_.movement)) should be(17)
  }
}

If you would run this test from your IDE, you might get a pretty cryptic error, something like

class org.apache.spark.storage.StorageUtils$ (in unnamed module ..) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module ..

Reason for this is Spark using internal JDK class sun.nio.ch.DirectBuffer. Access to this class is restricted, but you can run your unit tests by adding certain VM options:

--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED

These options are not needed when submitting your application to actual Spark cluster.

More details can be found in this StackOverflow discussion.


With that test we made sure that red, indeed, goez fast. And you should be able to easily add more functions to your application, and create more tests by extending SparkTester.

All code can be found on Github.


Upd. Check out second part of the article - "Starting up Spark Standalone Cluster with Docker" - here

9,645 views1 comment

Recent Posts

See All

1 Comment


Guest
May 23

Hello,

These VM arguments did not work for me. I'm using using Java 17 (corretto-17) with Gradle (7.3.3) and Scala (2.12.17), when running unit tests that uses Spark (3.4.1) it continues to fail with the same error. I must be missing something, but I've tried all these VM arguments and it's not working. Any other advice would be greatly appreciated.


https://stackoverflow.com/questions/78524978/unit-tests-with-spark-3-4-1-and-java-17-fail-even-when-using-vm-arguments


Thank you.

Like
bottom of page