Series of Azure Databricks posts:
- Dec 01: What is Azure Databricks
- Dec 02: How to get started with Azure Databricks
- Dec 03: Getting to know the workspace and Azure Databricks platform
- Dec 04: Creating your first Azure Databricks cluster
- Dec 05: Understanding Azure Databricks cluster architecture, workers, drivers and jobs
- Dec 06: Importing and storing data to Azure Databricks
- Dec 07: Starting with Databricks notebooks and loading data to DBFS
- Dec 08: Using Databricks CLI and DBFS CLI for file upload
- Dec 09: Connect to Azure Blob storage using Notebooks in Azure Databricks
- Dec 10: Using Azure Databricks Notebooks with SQL for Data engineering tasks
- Dec 11: Using Azure Databricks Notebooks with R Language for data analytics
- Dec 12: Using Azure Databricks Notebooks with Python Language for data analytics
- Dec 13: Using Python Databricks Koalas with Azure Databricks
- Dec 14: From configuration to execution of Databricks jobs
- Dec 15: Databricks Spark UI, Event Logs, Driver logs and Metrics
- Dec 16: Databricks experiments, models and MLFlow
- Dec 17: End-to-End Machine learning project in Azure Databricks
- Dec 18: Using Azure Data Factory with Azure Databricks
- Dec 19: Using Azure Data Factory with Azure Databricks for merging CSV files
- Dec 20: Orchestrating multiple notebooks with Azure Databricks
- Dec 21: Using Scala with Spark Core API in Azure Databricks
- Dec 22: Using Spark SQL and DataFrames in Azure Databricks
Yesterday we took a closer look into the nuts and bolts of DataFrames using Spark SQL and the power of using SQL to query data.
For today we will take a glimpse into Streaming with Spark Core API in Azure Databricks.
Spark Streaming is the process that can analyse not only batches of data but also streams of data in near real-time. It gives the powerful interactive and analytical applications across both hot and cold data (streaming data and historical data). Spark Streaming is a fault tolerance system, meaning due to lineage of operations, Spark will always remember where you stopped and in case of a worker error, another worker can always recreate all the data transformation from partitioned RDD (assuming that all the RDD transformations are deterministic).
Spark streaming has a native connectors to many data sources, such as HDFS, Kafka, S3, Kinesis and even Twitter.
Start your Workspace in Azure Databricks. Create new notebook, name it: Day23_streaming and use the default language: Python. If you decide to use EventHubs from reading data from HDFS or other places, Scala language might be slightly better.
If you will be using Spark context, otherwise just import pyspark.sql namespace.
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.types import * from pyspark.sql.functions import *
We will be using the demo data and this demo example from databricks-datasets folder:
%fs ls /databricks-datasets/structured-streaming/events/
And you can check the structure of one file, by using:
%fs head /databricks-datasets/structured-streaming/events/file-0.json
You must do the initialisation of the stream with:
- inputPath (where your files will be coming)
- Schema of the input files
- ReadStream function with a function if schema, input data additional options (As: picking one file at a time)
- Aggregate function (for count of events in this particular case) and use of ReadStream function.
inputPath = "/databricks-datasets/structured-streaming/events/" # Define the schema to speed up processing jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) streamingInputDF = ( spark .readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat a sequence of files as stream of one at a time .json(inputPath) ) streamingCountsDF = ( streamingInputDF .groupBy( streamingInputDF.action, window(streamingInputDF.time, "1 hour")) .count() )
You start a streaming computation by defining a sink and starting it. In this case, to query the counts interactively, set the completeset of 1 hour counts to be in an in-memory table.
Run the following command to examine the outcome of a query.
query = ( streamingCountsDF .writeStream .format("memory") # memory = store in-memory table (for testing only) .queryName("counts") # counts = name of the in-memory table .outputMode("complete") # complete = all the counts should be in the table .start() )
And once the cluster is running, you can do variety of analysis. The Key component is the “.start” method – embedded in the main function, that you can run the spark due to incoming poklikuc.
You can also further shape the data by using Spark SQL:
%sql SELECT action ,date_format(window.end, "MMM-dd HH:mm") as time ,count FROM counts ORDER BY time, action
Tomorrow we will explore Spark’s own MLlib package for Machine Learning using Azure Databricks.
Complete set of code and SQL notebooks (including HTML) will be available at the Github repository.
Happy Coding and Stay Healthy!