⬆️ ⬇️

Cassandra Sink for Spark Structured Streaming

A couple of months ago, I started exploring Spark, and at some point I was faced with the problem of saving Structured Streaming calculations in the Cassandra database.



In this post, I give a simple example of creating and using Cassandra Sink for Spark Structured Streaming. I hope that the post will be useful to those who have recently started working with Spark Structured Streaming and are wondering how to upload the results of calculations to the database.



The idea of ​​the application is very simple - to receive and parse messages from the Kafka, perform simple transformations in the Spark and save the results in Cassandra.



Pros Structured Streaming



About Structured Streaming can be read in detail in the documentation . In short, Structured Streaming is a highly scalable stream processing engine that is based on the Spark SQL engine. It allows you to use Dataset / DataFrame to aggregate data, calculate window functions, connections, etc. That is, Structured Streaming allows you to use good old SQL to work with data streams.

')

What is the problem?



Stable release Spark Structured Streaming was released in 2017. That is, this is a fairly new API in which the basic functionality is implemented, but some things will have to be done by ourselves. For example, in Structured Streaming, there are standard functions for recording output data to a file, tab, console, or memory, but in order to save data to the database, you will have to use the foreach receiver in Structured Streaming and implement the ForeachWriter interface. Starting with Spark 2.3.1, this functionality can only be implemented on Scala and Java .



I assume that the reader already knows how Structured Streaming works in general terms, knows how to implement the necessary transformations and is now ready to upload the results to the base. If some of the steps above are unclear, official documentation can be a good starting point for learning Structured Streaming. In this article, I would like to focus on the last step when you need to save the results in a database.



Below, I will describe an example of implementing Cassandra sink for Structured Streaming and explain how to run it in a cluster. The full code is available here .



When I first encountered the above problem, this project turned out to be very useful. However, it may seem a bit tricky if the reader has just started working with Structured Streaming and is looking for a simple example of how to upload data to a cassandra. In addition, the project is written to work in local mode and requires some changes to run in a cluster.



I also want to give examples of how to save data in MongoDB and any other database that uses JDBC .



A simple solution



To upload data to an external system, you must use a foreach receiver. Read more about it here . In short, you need to implement the ForeachWriter interface. That is, it is necessary to determine how to open a connection, how to process each piece of data, and how to close the connection at the end of processing. The source code is as follows:



class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 


I will describe the definition of CassandraDriver and the structure of the output tables later, but for now let's take a closer look at how the above code works. To connect to the casandra from Spark, I create a CassandraDriver object that provides access to the CassandraConnector , a connector designed by DataStax . CassandraConnector is responsible for opening and closing the connection to the database, so I just output debug messages in the open and close methods of the CassandraSinkForeach class.



The above code is called from the main application as follows:



 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 


CassandraSinkForeach is created for each row of data, so each working node inserts its own part of the rows into the database. That is, each worker node executes val cassandraDriver = new CassandraDriver (); This is what CassandraDriver looks like:



 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 


Let's take a closer look at the spark object. The code for SparkSessionBuilder is as follows:



 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 


On each working node, SparkSessionBuilder provides access to the SparkSession that was created on the driver. To make such access possible, it is necessary to serialize SparkSessionBuilder and use a transient lazy val , which allows the serialization system to ignore conf and spark objects during initialization of the program until the moment when the objects are accessed. Thus, when launching the program, buildSparkSession is serialized and sent to each working node, but conf and spark objects are resolved only at the moment when the working node accesses them.



Now let's look at the main application code:



 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 


When an application is sent for execution, buildSparkSession is serialized and sent to the working nodes, however conf and spark objects remain unresolved. The driver then creates a spark object inside the KafkaToCassandra and distributes the work between the working nodes. Each working node reads data from the kafka, does simple transformations on the received portion of records, and when the working node is ready to write the results to the database, it allows conf and spark objects, thereby gaining access to the SparkSession created on the driver.



How to build and run the application?



When I switched from PySpark to Scala, it took me some time to figure out how to build the application. Therefore, I included Maven pom.xml in my project. A reader can build an application using Maven by running the mvn package command. After the application can be sent for execution using



 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 


In order to build and run the application, it is necessary to replace the names of my AWS machines with their own (i.e., replace everything that looks like ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).



Spark and Structured Streaming in particular is a new topic for me, so I will be very grateful to readers for comments, discussion and corrections.

Source: https://habr.com/ru/post/425503/



All Articles