Advent of 2021, Day 14 – Introduction to Spark Streaming

[This article was first published on R – TomazTsql, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Series of Apache Spark posts:

Spark Streaming or Structured Streaming is a scalable and fault-tolerant, end-to-end stream processing engine. it is built on the Spark SQL engine. Spark SQL engine will is responsible for running results sets for streaming data, regardless of static or continuously in coming stream data.

Spark stream can use Dataframe (or Datasets) API in Scala, Python, R or Java to work on handling data ingest, creating streaming analytics and do all the computations. All these requests and workloads are done against Spark SQL engine.

Spark SQL engine for the structured streaming queries have undergo some changes with Spark 2.3 and now uses low-latency processing mode called continuous processing. This mode is capable of achieving end-to-end low latency times (as low as 1 millisecond per changes or query operations on dataframe/dataset)

Quick setup using R

Assuming that you have all the installation completed, and we start with starting the master cluster.

Before starting, we will need to run Netcat (nc) server and we can start the localhost. Netcat is s a command-line utility that reads and writes data across network connections, using the TCP or UDP protocols. And this will generate and mimic the streaming data. To run the Netcat server, run the following CLI commnand (server: localhost; port: 9999):

nc -lk 9999

Using R we will connect to master and create a session.

library(SparkR)
sparkR.session(appName = "StructuredStreamApp")

And we will define a dataframe, where we want to store the streaming data

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

Copy paste this script in R file (name it: Stream-word-count.R):

library(SparkR)
sparkR.session(appName = "StructuredStreamApp")

hostname <- args[[1]]
port <- as.integer(args[[2]])
lines <- read.stream("socket", host = hostname, port = port)

words <- selectExpr(lines, "explode(split(value, ' ')) as word")

wordCounts <- count(groupBy(words, "word"))

query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
sparkR.session.stop()

And run this script from CLI using spark-submit bash and push it to localhost on port 9999, that you have already started using nc:

/bin/spark-submit /Rsample/Stream-word-count.R localhost 9999

Quick setup using Python

Similar to R, you can do this with Python (or Scala) as well. So assuming that you already have Spark SQL engine installed and nc is up and running on localhost with port 9999.

Create a Python file (Stream-word-count.py) and copy the content:

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: Stream-word-count.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)

    host = sys.argv[1]
    port = int(sys.argv[2])

    spark = SparkSession\
        .builder\
        .appName("StructuredStreamApp")\
        .getOrCreate()

    lines = spark\
        .readStream\
        .format('socket')\
        .option('host', host)\
        .option('port', port)\
        .load()

    # Split the lines into words
    words = lines.select(
        # explode turns each item in an array into a separate row
        explode(
            split(lines.value, ' ')
        ).alias('word')
    )

    wordCounts = words.groupBy('word').count()

    query = wordCounts\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .start()

    query.awaitTermination()

And run the following file from CLI:

/bin/spark-submit /Pysample/Stream-word-count.py localhost 9999

In both cases, you should be getting the results back in dataset/dataframe that is ready to be analysed.

Tomorrow we will make dataframe operations for Spark streaming.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021! ?

To leave a comment for the author, please follow the link and comment on their blog: R – TomazTsql.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)