This is a continuation of "Testing Spark Streaming locally with EmbeddedKafka". If you're not familar with EmbeddedKafka - I'd recommend you to briefly go through first article, I won't be getting into much detail this time.
In previous article we've used EmbeddedKafka to send a String, and then receive it in Spark Streaming application. Real-world applications usually use more complex data structures, and just sending a String won't be enough. Let's see how we can work with case classes, utilizing EmbeddedKafka.
As I've previously noted, Spark allows you to write application using Structures Streaming in the same way as you'd processing batch data - Spark would handle all the complex stuff under the hood. And specifically to demonstrate that, I'll be using the code from another of my articles - "Testing Spark apps locally with Scalatest" - without any changes, so that same "code under test" could be used both from batch and streaming tests.
Setting up dependencies and utility classes
You should already know that part from the previous article, so I won't stop here for long.
// spark dependencies
libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.3.0"
// test dependencies
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.15" % Test
libraryDependencies += "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.0" % Test
And a utility class to set up all the infra we need to run tests:
trait SparkStreamingTester
extends SparkTester
with Suite
with BeforeAndAfterEach {
val checkpointDir: File = Files.createTempDirectory("stream_checkpoint").toFile
// nit little trick to delete checkpoint directory when test finishes
FileUtils.forceDeleteOnExit(checkpointDir)
sparkSession.sparkContext.setCheckpointDir(checkpointDir.toString)
implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 9092,
zooKeeperPort = 2181,
customBrokerProperties = Map("auto.create.topics.enable" -> "true")
)
override def beforeEach(): Unit = {
EmbeddedKafka.start()(kafkaConfig)
}
override def afterEach(): Unit = {
EmbeddedKafka.stop()
}
}
Serializing / deserializing Kafka messages
This time we won't be just sending Strings but real case classes, and that's probably how you'll going to use Spark Streaming in real-life applications.
Let's start by creating this class first:
case class WarhammerUnit(faction: String, name: String, color: String, movement: Int)
Next, how do we send it through Kafka? Process of converting snapshot of an object into some form that can be transported is called serialization. Accompanying process of re-creating object in memory from such form is deserialization. So we'll have to create a serializer and provide it to Kafka when we're publishing an instance of our case class, so that instance can be converted. In our case I decided to go with Json serialization - this allows us to exchange messages between entirely different systems, and as a bonus messages are human-readable out of the box. If you're just passing messages between different pipelines, you can go with byte serialization instead - trading off some readability and ease of integration for smaller message size, and potentially faster SerDe process.
Code I'm providing here is shortened for readability's sake, and I'm not doing any error handling etc - it's good enough to show an example, but you'll need something a bit better in production:
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.Serializer
class TestJsonSerializer[T] extends Serializer[T] {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
override def serialize(topic: String, data: T): Array[Byte] =
mapper.writeValueAsBytes(data)
}
We're relying on FasterXML/jackson to do most of the work here, and as we extend kafka Serializer interface, we can use this class for serializing objects passed to Kafka.
Now we'll do the same thing for Deserializer - super-simplistic, library for all the heavy lifting:
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.Deserializer
import scala.reflect.ClassTag
class TestJsonDeserializer[T](implicit tag: ClassTag[T])
extends Deserializer[T] {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
override def deserialize(topic: String, bytes: Array[Byte]): T =
mapper.readValue(bytes, tag.runtimeClass.asInstanceOf[Class[T]])
}
Let's return to the test, and see how to use those.
The actual application under test
Code we're testing would be super-simple - we'll get a Dataset of Warhammer units, check their color, and increase their movement if the color is right. Which is the right color? Obviously it's red! Red goez fasta!
class Accelerator(sparkSession: SparkSession) extends Serializable {
import sparkSession.implicits._
def redGoezFasta(units: Dataset[WarhammerUnit]): Dataset[WarhammerUnit] = {
units.map { unit =>
unit.color match {
case "Red" => unit.copy(movement = unit.movement + 5)
case _ => unit
}
}
}
}
One important thing to note here is what I've promised in the beginning of the post - Spark allows you to abstract out types of data you're processing, and just concentrate on business object and rules for their behaviour. The fact that we're running streams, reading Kafka, serializing/deserializing data is not mentioned at all.
Let's see how it all works together now - and for that we'll need to write a test.
All the fun - integration testing with EmbeddedKafka
Now here's the part that would actually know about stuff that happens under the hood - publishing the message and reading result from the stream.
I'll split out all the code in chunks, and explain purpose of each bit. Some groundwork first:
val testData = WarhammerUnit("Orks", "Trukk", "Red", 12)
val decoderFn = (v: Array[Byte]) => {
val deserializer: Deserializer[WarhammerUnit] =
new TestJsonDeserializer[WarhammerUnit]
deserializer.deserialize("", v)
}
val decoderUdf = udf(decoderFn)
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", s"localhost:${kafkaConfig.kafkaPort}")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val unitDf: Dataset[WarhammerUnit] = df.withColumn("decoded", decoderUdf(col("value")))
.select("decoded.*")
.as[WarhammerUnit]
We're preparing the "bootstrapping" part - this is what will happen on server that receives the message and does the processing. We need a Deserializer for this, also we need to connect running Spark session to Kafka server and subscribe it to specific topic. startingOffsets is set to earliest to avoid any possible race conditions, though in real life I doubt you would want to do that - as Spark application will immediately start processing all the data that is in Kafka topic, and that might be days or even months of data.
readStream method gives us a DataFrame, as Spark has no understanding what kind of data can the stream contain - but we know that we'll be sending our serialized class in "value" column, so we take it, run through deserialization, and convert to an instance of a case class.
val accelerator = new Accelerator(sparkSession)
val resultDf = accelerator.redGoezFasta(unitDf)
And that was all the application we need - create instance of our class with business logic, pass Dataset to it, get resultDf back. If Dataset would be coming from csv file, or from database - nothing would've change.
All the preparatory work is now complete, our application is connected and ready to read messages from the stream - we just have to publish them
val query = resultDf.writeStream
.format("memory")
.queryName(tableName)
.outputMode(OutputMode.Append())
.trigger(Trigger.Once())
.start()
This is part where our test connects to the pipe that "comes out" of our business logic, and receives the processed result - we don't need to persist result, just save it to in-memory table for our inspection.
implicit val serializer: Serializer[WarhammerUnit] =
new TestJsonSerializer[WarhammerUnit]
EmbeddedKafka.publishToKafka(topic, testData)
query.processAllAvailable()
Finally, we'll publish the message to a topic - and then by calling processAllAvailable we'll block until the stream would finish - this method should not be used in production, it's intended for testing purposes only, and it suits this purpose very well. That's it, our message was published, received on server, processed, and sent further down the stream - back into our test. Let's see if it is what we expected it to be.
val results = sparkSession.sql(f"SELECT * FROM $tableName").as[WarhammerUnit].collect()
results.length should be(1)
val result = results.head
result.name should be(testData.name)
result.movement should be(17)
We collect results from in-memory table we created several lines ago, cast it to WarhammerUnit (no failures expected, as we know what our application sent us), collect and check that movement speed was actually increased.
As an excercise to the reader, you might want to test that movement is unchanged for other colors)
Wrap up
Recapping shortly - we've just seen that Spark application allows you to easily reuse code between Structured streaming and batch modes, and for common use cases it's an absolutely seamless transition. We've used EmbeddedKafka and Json SerDe to send and receive messages, and seen how to write a good integration test for Spark streaming application.
You can find all the code in my github repo as usual.
Comments