Series of Apache Spark posts:
- Dec 01: What is Apache Spark
- Dec 02: Installing Apache Spark
- Dec 03: Getting around CLI and WEB UI in Apache Spark
- Dec 04: Spark Architecture – Local and cluster mode
- Dec 05: Setting up Spark Cluster
- Dec 06: Setting up IDE
- Dec 07: Starting Spark with R and Python
- Dec 08: Creating RDD files
- Dec 09: RDD Operations
- Dec 10: Working with data frames
- Dec 11: Working with packages and spark DataFrames
- Dec 12: Spark SQL
- Dec 13: Spark SQL Bucketing and partitioning
- Dec 14: Spark SQL query hints and executions
Spark Streaming or Structured Streaming is a scalable and fault-tolerant, end-to-end stream processing engine. it is built on the Spark SQL engine. Spark SQL engine will is responsible for running results sets for streaming data, regardless of static or continuously in coming stream data.
Spark stream can use Dataframe (or Datasets) API in Scala, Python, R or Java to work on handling data ingest, creating streaming analytics and do all the computations. All these requests and workloads are done against Spark SQL engine.
Spark SQL engine for the structured streaming queries have undergo some changes with Spark 2.3 and now uses low-latency processing mode called continuous processing. This mode is capable of achieving end-to-end low latency times (as low as 1 millisecond per changes or query operations on dataframe/dataset)
Quick setup using R
Assuming that you have all the installation completed, and we start with starting the master cluster.
Before starting, we will need to run Netcat (nc) server and we can start the localhost. Netcat is s a command-line utility that reads and writes data across network connections, using the TCP or UDP protocols. And this will generate and mimic the streaming data. To run the Netcat server, run the following CLI commnand (server: localhost; port: 9999):
nc -lk 9999
Using R we will connect to master and create a session.
library(SparkR) sparkR.session(appName = "StructuredStreamApp")
And we will define a dataframe, where we want to store the streaming data
# Create DataFrame representing the stream of input lines from connection to localhost:9999 lines <- read.stream("socket", host = "localhost", port = 9999) # Split the lines into words words <- selectExpr(lines, "explode(split(value, ' ')) as word") # Generate running word count wordCounts <- count(group_by(words, "word"))
Copy paste this script in R file (name it: Stream-word-count.R):
library(SparkR) sparkR.session(appName = "StructuredStreamApp") hostname <- args[] port <- as.integer(args[]) lines <- read.stream("socket", host = hostname, port = port) words <- selectExpr(lines, "explode(split(value, ' ')) as word") wordCounts <- count(groupBy(words, "word")) query <- write.stream(wordCounts, "console", outputMode = "complete") awaitTermination(query) sparkR.session.stop()
And run this script from CLI using spark-submit bash and push it to localhost on port 9999, that you have already started using nc:
/bin/spark-submit /Rsample/Stream-word-count.R localhost 9999
Quick setup using Python
Similar to R, you can do this with Python (or Scala) as well. So assuming that you already have Spark SQL engine installed and nc is up and running on localhost with port 9999.
Create a Python file (Stream-word-count.py) and copy the content:
import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: Stream-word-count.py <hostname> <port>", file=sys.stderr) sys.exit(-1) host = sys.argv port = int(sys.argv) spark = SparkSession\ .builder\ .appName("StructuredStreamApp")\ .getOrCreate() lines = spark\ .readStream\ .format('socket')\ .option('host', host)\ .option('port', port)\ .load() # Split the lines into words words = lines.select( # explode turns each item in an array into a separate row explode( split(lines.value, ' ') ).alias('word') ) wordCounts = words.groupBy('word').count() query = wordCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .start() query.awaitTermination()
And run the following file from CLI:
/bin/spark-submit /Pysample/Stream-word-count.py localhost 9999
In both cases, you should be getting the results back in dataset/dataframe that is ready to be analysed.
Tomorrow we will make dataframe operations for Spark streaming.
Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers
Happy Spark Advent of 2021!