Multiple Stateful Operators in Structured Streaming

Structured Streaming: An Exploration of Multiple Stateful Operators

Introduction:

Introduction:

In the rapidly evolving world of data engineering, the requirements for processing large volumes of data with low latency have changed drastically. Traditional data operations such as filtering, joining, aggregating, and writing have become insufficient for today’s business needs. To meet these requirements, Apache Spark’s Structured Streaming offers a powerful solution. With the introduction of Project Lightspeed, Structured Streaming now enables the execution of multiple stateful operators within a single stream, significantly reducing complexity, latency, and cost. In this article, we will explore the concept of stateful operators and watermarks in Structured Streaming, and demonstrate how to chain multiple stateful operators in a stream using real-life examples.

Full Article: Structured Streaming: An Exploration of Multiple Stateful Operators

Structured Streaming: Performing Stateful Data Operations in Apache Spark

In the world of data engineering, certain operations have been used since the inception of Extract, Transform, Load (ETL) processes. These operations include filtering, joining, aggregation, and writing results. However, the requirements for latency and throughput have significantly changed. Handling a few events or gigabytes of data per day is no longer sufficient. Nowadays, businesses need to process terabytes or even petabytes of data daily, with low latencies measured in seconds or minutes.

Apache Spark’s Structured Streaming is an open-source stream processing engine designed specifically for handling large data volumes with low latency. It serves as the core technology behind the Databricks Lakehouse platform, making it the ideal platform for streaming data. With the introduction of Project Lightspeed, users can now perform classic data operations within a single stream, simplifying complexity, reducing latency, and cutting costs.

Stateful Operators: Working with Contextual Data

When it comes to Structured Streaming, operators can be categorized into two types: stateless and stateful. Stateless operators perform operations without relying on information from previous microbatches. For example, filtering records to retain rows with values greater than 10 is a stateless operation since it only considers the current data being processed.

On the other hand, stateful operators require additional context beyond the current microbatch. For instance, calculating a count of values over 5-minute windows requires storing the running count for each key within the aggregation, regardless of how the microbatches are spread across. This stored data is referred to as state, and operators that rely on state are known as stateful operators. Some common stateful operators include aggregations, joins, and deduplication.

Watermarks: Managing Late Data and State Retention

Every stateful operator in Structured Streaming needs to specify a watermark—a crucial aspect of managing late data and state retention. A watermark serves two purposes: defining the allowed lateness of data and determining how long to keep the state.

You May Also Like to Read  Off-Policy Monte Carlo Control: Tackling the Reinforcement Learning Racetrack Challenge

Suppose we’re processing a dataset with records containing event timestamps, and we’re aggregating data within 5-minute windows based on these timestamps. What happens if some records arrive out of order? Should we include a record with a timestamp of 12:04 in the 12:00-12:05 aggregation, considering we’ve already processed records with timestamps of 12:11? Additionally, how long should we retain the state for the 12:00-12:05 window? We don’t want the state data to accumulate indefinitely, as it could impact performance. This is where watermarks play a key role.

By utilizing the `.withWatermark` setting, it becomes possible to specify the allowed delay in seconds, minutes, hours, or days. Structured Streaming then determines when records stored in state are no longer needed. For example, specifying a watermark of “10 minutes” signifies that data arriving up to 10 minutes late will be accepted, based on the event timestamp column. Structured Streaming calculates and saves a watermark timestamp at the end of each microbatch, subtracting the specified time interval from the latest event timestamp received.

Upon the start of each microbatch, Structured Streaming compares the event timestamps of incoming records and the data currently in state with the watermark timestamps. Any input records and state with timestamps earlier than the watermark values are dropped. Watermarks allow Structured Streaming to handle late records and state effectively, regardless of the number of stateful operators present in a single stream.

Example: Chained Time Window Aggregations

To demonstrate the use of multiple stateful operators, let’s explore an example involving chained time window aggregations.

We begin by receiving a stream of raw events, where our goal is to count the number of events that occur every 10 minutes per user. We then intend to calculate the average of these counts per hour before writing out the result.

First, we read the source data using a standard `readStream` call. Any streaming source supported by Structured Streaming can be used in this step.

Next, we perform the first windowed aggregation on the `userId` field. Alongside defining the timestamp column and the window’s duration, we also specify a watermark to handle late data. For this example, we allow data within one minute of lateness. The code snippet for this aggregation is as follows:

“`python
eventCount = events
.withWatermark(“eventTimestamp”, “1 minute”)
.groupBy(
window(events.eventTimestamp, “10 minutes”),
events.userId
).count()
“`

Structured Streaming automatically creates a `window` column in the result when performing windowed aggregations. This column consists of the start and end timestamps that define each window. After an hour, the output for `userId` 1 and 2 may look like this:

“`
window userId count
{“start”: “2023-06-02T11:00:00”, 1 12
“end”: “2023-06-02T11:10:00″}”

You May Also Like to Read  Discover the Power of Flair AI: Unleashing the Potential of Artificial Intelligence for an Enhanced Experience

{“start”: “2023-06-02T11:00:00”, 2 7
“end”: “2023-06-02T11:10:00″}”

{“start”: “2023-06-02T11:10:00”, 1 8
“end”: “2023-06-02T11:20:00″}”

{“start”: “2023-06-02T11:10:00”, 2 16
“end”: “2023-06-02T11:20:00″}”

“`

To average these counts by hour, we need to perform another windowed aggregation using the previous window column’s timestamps. Previously, you would have had to write the intermediate result to a sink with `writeStream` and then read the data into a new stream for the second aggregation. However, with multiple stateful operators, you can chain both operations in the same stream.

To facilitate this chaining, a new syntax has been introduced to pass the window column created from the previous aggregation directly to the window function. The `eventCount.window` column is utilized in the following code snippet, with the window function correctly understanding the struct in the window column and allowing the creation of another window. The second aggregation defines an hour window and calculates the average of the counts per user:

“`python
eventAvg = eventCount
.groupBy(
window(eventCount.window, “1 hour”),
eventCount.userId
).avg(eventCount.count)
“`

After the second aggregation, the data for `userId` 1 and 2 would look like this:

“`
window userId avg
{“start”: “2023-06-02T11:00:00”, 1 10
“end”: “2023-06-02T12:00:00″}”

{“start”: “2023-06-02T11:00:00”, 2 12.5
“end”: “2023-06-02T12:00:00″}”
“`

Finally, we write the resulting DataFrame to a sink using `writeStream`. In this example, we’re using a Delta table as the sink. It’s worth noting that any sink supporting the “append” output mode is compatible. Since we’re using append mode, data is written to the sink only when the corresponding window closes. The window closure is determined by the watermark value being later than the end time of the window definition plus the allowed lateness. Here’s the code snippet for writing to the sink:

“`python
eventAvg.writeStream
.outputMode(“append”)
.format(“delta”)
.option(“checkpointLocation”, checkpointPath)
.trigger(processingTime=”30 seconds”)
.queryName(“eventRate”)
.start(outputPath)
“`

Conclusion

Apache Spark’s Structured Streaming introduces the capability to handle multiple stateful operators within a single stream, offering improved efficiency, reduced complexity, lower latency, and cost savings. By leveraging stateful and stateless operators, data engineers can perform various operations on large volumes of streaming data. Additionally, the flexibility of watermarking enables effective management of late data and state retention. With Structured Streaming and the forthcoming enhancements in Apache Spark 3.5.0, data engineers can process terabytes or even petabytes of data with ease, meeting the demands of today’s business requirements.

Summary: Structured Streaming: An Exploration of Multiple Stateful Operators

Summary:

Structured Streaming in Apache Spark is a powerful tool for processing large volumes of data in real-time with low latency. With the introduction of multiple stateful operators, it is now possible to perform complex data operations within a single stream, reducing complexity, latency, and cost. Stateful operators require the use of watermarks, which specify the allowed lateness of data and how long to retain state. Using examples, this article demonstrates how to chain multiple stateful operators, such as windowed aggregations, within a stream and write the results to a sink. This feature enhances the functionality of Structured Streaming and makes it an ideal choice for stream processing.

You May Also Like to Read  Sailing Smoothly Ahead: Discover the Latest Insights from the Databricks Blog

Frequently Asked Questions:

1. Question: What is data science and why is it important?

Answer: Data science refers to the field that involves analyzing, interpreting, and extracting valuable insights from large datasets. It combines various techniques from statistics, mathematics, programming, and domain expertise to uncover patterns, trends, and correlations in data. Data science is important as it allows businesses to make data-driven decisions, identify potential risks and opportunities, improve operational efficiency, and enhance customer experience.

2. Question: What are the key skills required to become a data scientist?

Answer: To become a data scientist, you need a combination of technical and non-technical skills. The key technical skills include proficiency in programming languages like Python or R, solid understanding of statistics and mathematics, knowledge of data visualization tools, and expertise in handling large datasets. Non-technical skills such as critical thinking, problem-solving, communication, and business acumen are equally important in interpreting data and effectively communicating insights to stakeholders.

3. Question: Can you explain the steps involved in the data science process?

Answer: The data science process typically involves several key steps. First, you need to define the problem or question you want to answer. Next, you gather and collect relevant data from various sources. Once the data is gathered, it needs to be cleaned and preprocessed to ensure quality and consistency. After cleaning, you perform exploratory data analysis to uncover patterns and relationships. Then, you develop and apply suitable modeling techniques to build predictive or descriptive models. Finally, you evaluate the model’s performance and communicate the results to stakeholders.

4. Question: What are the common challenges faced in data science projects?

Answer: Data science projects often face several challenges. One common challenge is dealing with incomplete or messy data, requiring extensive data cleaning and preprocessing efforts. Another challenge is selecting the right algorithms or models, as the choice depends on the problem and available data. Additionally, managing and analyzing big data can be a challenge due to storage and computational constraints. Lastly, effectively communicating insights to non-technical stakeholders in a clear and concise manner is often a challenge in data science projects.

5. Question: How is data science applied in different industries?

Answer: Data science has a wide range of applications across industries. In the healthcare industry, data science is used to analyze patient records and predict disease outcomes. In finance, it helps in fraud detection, risk assessment, and portfolio optimization. E-commerce companies leverage data science to personalize customer recommendations and improve user experience. Transportation and logistics companies use data science for route optimization and demand forecasting. Overall, data science plays a crucial role in creating value and driving innovation across various industries.