Machine Learning

What Can We Do When Memory Becomes the New Bottleneck for Data Engineers?

memory has undoubtedly become an important resource. As the demand for memory and storage infrastructure driven by the AI ​​boom has reached historic highs, companies like Micron Technology and Sandisk have attracted unprecedented attention and are raising product prices through strong pricing power. But this is not good news for companies that build data-intensive applications, rely on high-volume storage for AI training, use large-scale analytics, or work on hard edges in cloud services.

For data engineers, these are not just market issues. Rather, it is a daily burden. When RAM and flash become more expensive, the old “add more capacity” reflex no longer works. Budgets are not proportional to the volume of data and cloud bills are considered. As data engineers, what can we do when our dataset doubles but the cluster doesn't? We have to create.

In this article, I will start with a real-world ETL challenge that requires completing the data transformation of over 6 million social media posts with mixed-type data fields within limited computing capacity. I'll go through some solutions – including both classic and advanced – to keep your ETL pipeline running without hardware or cloud upgrades.

Problem: A 6.2 Million-Row Dataset Will Not Fit in Memory

The story starts in a new ETL pipeline that you will create. The raw data is 6.2 million posts from social media. The dataset is extracted from the social media API and turns into more than 200 columns after JSON flattening, and the most problematic part is: a large number of data fields are mixed data types.

Here are some examples of these columns in native JSON format:

{
  "reaction_count": 1250
}

{
  "reaction_count": "1250"
}

{
  "reaction_count": null
}
{
  "hashtags": ["AI", "Python"]
}

{
  "hashtags": "AI"
}

{
  "hashtags": null
}

Because PySpark requires a consistent schema and the social media API schema changes from time to time, consider using Pandas to deal with mixed-type columns. Unlike PySpark, Pandas stores these columns as object by default and does not require that every row conform to the same schema.

import pandas as pd

def normalize_mixed_columns(df, mixed_columns):
    """
    Convert mixed-type columns to strings.
    """

    cleaned_df = df.copy()

    for column in mixed_columns:

        cleaned_df[column] = (
            cleaned_df[column]
            .where(cleaned_df[column].isna(),
                   cleaned_df[column].astype(str))
        )

    return cleaned_df

social_posts_clean = normalize_mixed_columns(
    social_posts_df,
    mixed_columns
)

Simple and straightforward. However, when trying to run this code, the execution process was terminated because the memory usage exceeded the available resources. The operation failed.

A General Solution: Reducing High Memory with Chunk-Based Processing

The threshold is 6.2 million rows. The size of the dataset is about 30GB, which is larger than the memory instance of a typical cloud worker. Exceeds available worker memory during conversion of intermediate data frames. So instead of performing data type conversions for every column, which forces Pandas to create large temporary objects in memory, the merge process breaks each column into chunks. In this case, it is appropriate to set the chunking size as 250,000. So Pandas only needs to process 250k rows at a time, then free the memory and go to the next stage.

import gc

def normalize_mixed_columns_chunked(
    df,
    mixed_columns,
    chunk_size=250000
):

    cleaned_df = df.copy()

    for column in mixed_columns:

        col_idx = cleaned_df.columns.get_loc(column)

        for start in range(0, len(cleaned_df), chunk_size):

            end = min(start + chunk_size, len(cleaned_df))

            chunk = cleaned_df.iloc[start:end, col_idx]

            mask = chunk.notna()

            if mask.any():

                chunk = chunk.astype(object)

                chunk.loc[mask] = (
                    chunk.loc[mask]
                    .astype(str)
                    .values
                )

                cleaned_df.iloc[start:end, col_idx] = chunk.values

            del chunk
            del mask
            gc.collect()

    return cleaned_df


social_posts_clean = normalize_mixed_columns_chunked(
    social_posts_df,
    mixed_columns
)

After the maximum memory becomes too small, the data conversion completes successfully and the pipeline stabilizes. However, the running time becomes very long. This is not surprising, since the method of chunking is trade-offs. It trades execution speed for pipeline reliability.

From Manual Chunking to Automatic Similarity Killing

Apart from manual chunking in Pandas, Dask automatically splits the DataFrame into many smaller parts and resolves memory crashes during data conversion. But its internal execution mechanics are different from chunking. If I put chunk_size in Pandas, it reads one segment, runs my code on it, dumps it from RAM, and moves on to the next one. It uses one CPU core at a time, so in cloud services that offer multiple CPU cores, it didn't fully utilize the power. Additionally, I have to manually write a loop to combine the results, which makes the code complicated and long.

Dask automatically splits the dataset into chunks. Dask creates a task graph and organizes partitions across all available CPU cores – significantly reducing runtime.

However, we cannot ignore the pitfall of mixed data types in Dask. Because Dask DataFrame contains many Pandas DataFrame components, when reading CSV or JSON files, Dask transfers the data types from the data sample. If a column contains inconsistent values, for example, empty strings (“”), None, integers, and strings, Dask will likely raise a ValueError, TypeError or metadata error. This happens because Dask assumes the data type of the column in the first data sample and guesses that the column is an integer. But if it then encounters a string in one of the following passages, Dask raises an error.

To solve this problem, we must explicitly specify the expected column data types instead of relying on automatic understanding. The code below is to use Dask to perform data conversion on mixed type columns.

import dask.dataframe as dd

df = dd.read_parquet(
    "social_posts.parquet",
    engine = "pyarrow"
)

mixed_columns = [
    "hashtags",
    "mentions",
    "location",
    "reaction_count",
]

for column in mixed_columns:
    df[column] = df[column].map(str, meta=(column, 'str'))

df.to_parquet("social_posts_clean/", engine="pyarrow")

Dask is not as flexible as Pandas chunking when handling dynamic columns with mixed data types because it needs to specify which columns to convert. Also, it still uses multiple Pandas functions within each component, so workloads dominated by Python object columns can remain memory-intensive and slow when processing million-row datasets. Is there another CPU-cache-efficient solution?

Another strong option – Polars

You might ask if there is a way to balance speed and memory efficiency. The answer is Polars, a DataFrame library implemented in the Rust Engine. Compared to Python, Rust generates more advanced native machine codes and offers better memory management. Reduces memory allocation and eliminates overhead garbage collection. However, Rust also has its problems. Its development speed is slower than Python due to strict compiler testing and its learning curve is extremely steep. Those are the reasons why it is not more popular than Python since it was created in 2010. Does that mean that data engineers cannot use this method if they are not familiar with Rust?

Polars is a fast DataFrame library built on the Rust engine and exposed through a Python API. It was developed in 2020 and is designed to handle large datasets much faster than native Python Pandas. It retains the power of the Rust engine but allows Python users to import the Python library.

Polars uses the Apache Arrow in-memory columnar data format, designed to reduce memory copies while maximizing CPU cache efficiency. Releases the .cast(pl.String) working directly in Rust code. These features make Polars run several times faster than Python and use a fraction of the memory.

Polars creates a lazy query plan and the query optimizer determines the most efficient execution plan before reading the data. These devices minimize unnecessary memory usage. Therefore, when handling datasets that exceed available memory, Polars can process the data in stream mode and prevent the entire dataset from being loaded into RAM at once.

import polars as pl

df = pl.read_parquet("social_posts.parquet")

mixed_columns = [
    "hashtags",
    "mentions",
    "location",
    "reaction_count",
]

df = df.with_columns([
    pl.col(col).cast(pl.String) for col in mixed_columns
])

df.write_parquet(
    "social_posts_clean.parquet"
)

Polars outperforms the previous two solutions in memory management because it is designed from the ground up to use memory more efficiently while Pandas Chunking and Dask focus on reducing memory pressure during execution.

However, Polars are not a silver bullet. It has its disadvantages. First, Polars introduces its own DataFrame API though using Python. Common Pandas activities such as apply()indexing, and group syntax often needs to be rewritten. Second, many third-party libraries are still built around Pandas, so integration with Polars still requires conversion between DataFrame formats.

Final thoughts

So are Pandas obsolete? That's not the case.

Each of these three methods solves a different problem. The best choice depends on the criteria rather than the latest technology.

If you have limited computing resources and are processing dynamic schemas, Pandas chunking is still an excellent solution. It greatly reduces high memory usage. The trade-off is execution time. But in most manufacturing environments, a slow but stable pipeline is more important than a fast one that fails repeatedly.

If your workload is already more than one machine and you want to use more CPU cores, Dask is a better option. It automates partitioning and parallel processing. However, you should pay attention to the consistency of schema and data types, especially when working with semi-structured data.

Polars are preferred when the task is very important and you learn the new DataFrame API. Polar is often considered the strongest choice due to its Rust engine, Apache Arrow memory format, and query optimizer. All these features enable Polars to process large datasets with very low memory usage and very high performance. Similar to Dask, you need to deal with problems caused by mixed data types and ensure schema consistency.

To conclude, memory optimization is not about finding a single best solution. Rather, it's about understanding the constraints of your project and choosing the right tool. In the era of AI, the ability to optimize data pipelines under memory constraints is becoming an important skill for data engineers.

A robust ETL pipeline requires more than memory efficiency. It also depends on the testing, maintenance, and reliability of the shipment.


This article is part of an active data engineering series. If you're interested in building production-ready ETL pipelines beyond performance development, you might also enjoy the article Your First Job as a Data Engineer at a New Company? Make the ETL Pipeline Testable, including environment setup, automated testing, and AI-assisted development.

Thanks for your reading!

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button