close
close

first Drop

Com TW NOw News 2024

Improving code quality during data transformation with Polars
news

Improving code quality during data transformation with Polars

Improving code quality during data transformation with PolarsImage created with AI by Dall-E

In our daily lives as Data/Analytic Engineers, writing ETL/ELT workflows and pipelines (or maybe your company uses a different term) is a routine and integral part of our work. However, in this article I will focus only on the Transformation phase. Why? Because in this phase, data from different sources and of different types becomes business meaningful for the company. This phase is very important and also incredibly delicate, because a mistake can immediately mislead the user, making them lose trust in your data.

To illustrate the process of improving code quality, let’s look at a hypothetical example. Imagine a website where we log user actions, such as what they viewed and purchased. We have user_id for the user ID, product_id for the product, action_type for the type of action (a view or purchase), and action_dt for the timestamp of the action.

from dataclasses import dataclass
from datetime import datetime, timedelta
from random import choice, gauss, randrange, seed
from typing import Any, Dict

import polars as pl

seed(42)
base_time= datetime(2024, 8, 9, 0, 0, 0, 0)

user_actions_data = (
{
"user_id": randrange(10),
"product_id": choice(("0001", "0002", "0003")),
"action_type": ("purchase" if gauss() > 0.6 else "view"),
"action_dt": base_time - timedelta(minutes=randrange(100_000)),
}
for x in range(100_000)
)

user_actions_df = pl.DataFrame(user_actions_data)

In addition, for our task we need a product catalog, which in our case only contains product_id and the corresponding price. Our data is now ready for the example.

product_catalog_data = {"product_id": ("0001", "0002", "0003"), "price": (10, 30, 70)}
product_catalog_df = pl.DataFrame(product_catalog_data)

Now let’s tackle our first task: creating a report that contains the total purchase amount and the ratio of purchased items to viewed items from the previous day for each user. This task is not particularly complex and can be implemented quickly. Here’s what it might look like using Polars:

yesterday = base_time - timedelta(days=1)
result = (
user_actions_df.filter(pl.col("action_dt").dt.date() == yesterday.date())
.join(product_catalog_df, on="product_id")
.group_by(pl.col("user_id"))
.agg(
(
(
pl.col("price")
.filter(pl.col("action_type") == "purchase")
.sum()
).alias("total_purchase_amount"),
(
pl.col("product_id").filter(pl.col("action_type") == "purchase").len()
/ pl.col("product_id").filter(pl.col("action_type") == "view").len()
).alias("purchase_to_view_ratio"),
)
)
.sort("user_id")
)

This is a working solution that can be deployed in production, some would say, but not us, since you opened this article. At the beginning I emphasized that I would focus specifically on the transformation step.

If we think about the long-term maintenance of this code, testing it, and consider that there will be hundreds of such reports, we must recognize that each subsequent developer understands this code less than the previous one, increasing the likelihood of errors with each change.

I would like to reduce this risk and therefore I have come up with the following approach:

Step 1: Let’s split up all the business logic into a separate class, like DailyUserPurchaseReport.

@dataclass
class DailyUserPurchaseReport:

Step 2: Let’s define the arguments that this class should accept: sources – various sources that we need for our work, and params – variable parameters that can change, in our case this could be the report date.

@dataclass
class DailyUserPurchaseReport:

sources: Dict(str, pl.LazyFrame)
params: Dict(str, Any)

Step 3: Define a method that performs the transformation, for example execute.

@dataclass
class DailyUserPurchaseReport:

sources: Dict(str, pl.LazyFrame)
params: Dict(str, Any)

def execute(self) -> pl.DataFrame:
pass

Step 4: Break the entire process into separate functions that accept a pl.LazyFrame and also return a pl.LazyFrame .

@dataclass
class DailyUserPurchaseReport:

sources: Dict(str, pl.LazyFrame)
params: Dict(str, Any)

def _filter_actions_by_date(self, frame: pl.LazyFrame) -> pl.LazyFrame:
pass

def _enrich_user_actions_from_product_catalog(self, frame: pl.LazyFrame) -> pl.LazyFrame:
pass

def _calculate_key_metrics(self, frame: pl.LazyFrame) -> pl.LazyFrame:
pass

def execute(self) -> pl.DataFrame:
pass

Step 5: Now use the magic pipe function to connect our entire pipeline together. This is exactly why we use pl.LazyFrame everywhere:

    def execute(self) -> pl.DataFrame:
result: pl.DataFrame = (
self.sources("user_actions")
.pipe(self._filter_actions_by_date)
.pipe(self._enrich_user_actions_from_product_catalog)
.pipe(self._calculate_key_metrics)
.collect()
)
return result

It is recommended to use LazyFrame for piping operations to take full advantage of query optimization and parallelization.

End code:

@dataclass
class DailyUserPurchaseReport:
"""
Generates a report containing the total purchase amount and the ratio of purchased items
to viewed items from the previous day for each user.

Attributes:
sources (Dict(str, pl.LazyFrame)): A dictionary containing the data sources, including:
- 'user_actions': A LazyFrame containing user actions data.
- 'product_catalog': A LazyFrame containing product catalog data.
params (Dict(str, Any)): A dictionary containing parameters, including:
- 'report_date': The date for which the report should be generated (previous day).
"""

sources: Dict(str, pl.LazyFrame)
params: Dict(str, Any)

def _filter_actions_by_date(self, frame: pl.LazyFrame) -> pl.LazyFrame:
"""
Filters user actions data to include only records from the specified date.

Args:
frame (pl.LazyFrame): A LazyFrame containing user actions data.

Returns:
pl.LazyFrame: A LazyFrame containing user actions data filtered by the specified date.
"""
return frame.filter(pl.col("action_dt").dt.date() == self.params("report_date"))

def _enrich_user_actions_from_product_catalog(
self, frame: pl.LazyFrame
) -> pl.LazyFrame:
"""
Joins the user actions data with the product catalog to include product prices.

Args:
frame (pl.LazyFrame): A LazyFrame containing user actions data.

Returns:
pl.LazyFrame: A LazyFrame containing user actions data enriched with product prices.
"""
return frame.join(self.sources("product_catalog"), on="product_id")

def _calculate_key_metrics(self, frame: pl.LazyFrame) -> pl.LazyFrame:
"""
Calculates the total purchase amount and the ratio of purchased items to viewed items.

Args:
frame (pl.LazyFrame): A LazyFrame containing enriched user actions data.

Returns:
pl.LazyFrame: A LazyFrame containing the total purchase amount and purchase-to-view ratio for each user.

"""
return (
frame.group_by(pl.col("user_id"))
.agg(
(
(
pl.col("price")
.filter(pl.col("action_type") == "purchase")
.sum()
).alias("total_purchase_amount"),
(
pl.col("product_id")
.filter(pl.col("action_type") == "purchase")
.len()
/ pl.col("product_id").filter(pl.col("action_type") == "view").len()
).alias("purchase_to_view_ratio"),
)
)
.sort("user_id")
)

def execute(self) -> pl.DataFrame:
"""
Executes the report generation process.

This method performs the following steps:
1. Filters user actions data to include only records from the previous day.
2. Joins the filtered user actions data with the product catalog.
3. Calculates the total purchase amount and purchase-to-view ratio for each user.
4. Returns the final report as a DataFrame.

Returns:
pl.DataFrame: A DataFrame containing the total purchase amount and purchase-to-view ratio for each user.
"""
result: pl.DataFrame = (
self.sources("user_actions")
.pipe(self._filter_actions_by_date)
.pipe(self._enrich_user_actions_from_product_catalog)
.pipe(self._calculate_key_metrics)
.collect()
)
return result

Let’s take a look at the implementation:

# prepare sources
user_actions: pl.LazyFrame = user_actions_df.lazy()
product_catalog: pl.LazyFrame = product_catalog_df.lazy()

# get report date
yesterday: datetime = base_time - timedelta(days=1)

# report calculation
df: pl.DataFrame = DailyUserPurchaseReport(
sources={"user_actions": user_actions, "product_catalog": product_catalog},
params={"report_date": yesterday},
).execute()

Result:

┌─────────┬───────────────────────┬────────────────────────┐
│ user_id ┆ total_purchase_amount ┆ purchase_to_view_ratio │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ f64 │
╞═════════╪═══════════════════════╪════════════════════════╡
│ 0 ┆ 1880 ┆ 0.422018 │
│ 1 ┆ 1040 ┆ 0.299065 │
│ 2 ┆ 2220 ┆ 0.541667 │
│ 3 ┆ 1480 ┆ 0.436782 │
│ 4 ┆ 1240 ┆ 0.264463 │
│ 5 ┆ 930 ┆ 0.254717 │
│ 6 ┆ 1080 ┆ 0.306122 │
│ 7 ┆ 1510 ┆ 0.345133 │
│ 8 ┆ 2050 ┆ 0.536842 │
│ 9 ┆ 1320 ┆ 0.414414 │
└─────────┴───────────────────────┴────────────────────────┘

Bonus

For those using Test-Driven Development (TDD), this approach is especially useful. TDD emphasizes writing tests before actual implementation. By having clearly defined, small functions, you can write precise tests for each part of the transformation process, ensuring that each function behaves as expected. This not only makes the process smoother, but also ensures that your transformations are thoroughly validated at every step.

Conclusion

In this article, I’ve outlined a structured approach to improving code quality in your data workflows using Polars. By isolating the transformation step and breaking the process down into discrete, manageable parts, we ensure that our code is both robust and maintainable. By using pl.LazyFrame and the pipe function, we take full advantage of Polars’ query optimization and parallelization capabilities. This approach not only improves the efficiency of our data transformations, but also ensures the integrity and business relevance of the data we work with. By following these steps, you can create more reliable and scalable data workflows, ultimately leading to better data-driven decision making.

Share your experience

If you have any experience or useful tips, please share your thoughts in the comments. It is always interesting to learn about the experiences of other developers.


Improving Code Quality During Data Transformation with Polars was originally published in Towards Data Science on Medium. People continued the discussion by bookmarking and commenting on this story.