Machine Learning

From Configuration to Orchestation: Creating ETL activity flow by AWS is no longer the struggle

Heading a cloud industry with whopping is 32% due to its first market access, strong technology and complete service contributions. However, many users receive the transition AWS of travel, and this dissatisfaction leads to many companies and organizational competitors and Google Cloud platform.

Despite the steady learning steeper, the AWS seated, AWS remaining a cloud of cloud due to its reliability, a hybrid cloud and many service options. Most importantly, the selection of the right strategies may significantly reduce the configuration of the configuration problems, transmission of work, and improve performance.

In this article, I will present an effective way to set the complete ETL pipe with Orts, based on my knowledge. It will also provide refreshing views in the production of information about AWS or make you feel fighting for the struggle when you conduct the configuration if this is your first time to use AWS for certain activities.

The strategy for designing the right data pipe

AWS have the most perfect nature for its major services. Building a warehouse of the production data left for AWS at least requires the following services:

  • IAM – Although this service is included in any part of the work of work, it is the basis for accessing all other services.
  • AWS S3 – Storage of data storage
  • AWLA Guard – ETL processes
  • Amazon Redshift – Data Warehouse
  • Cloudwatch – Monitor and Login

You also need access to the airflow if you have to plan further more and practice advanced restoration depending on the handling of error although the Redshift can manage certain basic services.

Making your work easier, I am very compliminating to install EDE (Visual Studio code or Pycharm and optional your favorite. Eide enhances your complex Python Code performance, local / error-adjusting, integration and party interaction. And in the following principle, I will provide action by action.

The first setting

Here are the first configuration steps:

  • Launch a visible nature in your ode
  • Enter leaning – basically, we need to file libraries to be used later.
pip install apache-airflow==2.7.0 boto3 pandas pyspark sqlalchemy
  • Include AWS CLIL – This step allows you to write the Scriptures to change the various AWS functions and make effective AWS management management.
  • AWS configuration – Make sure to include the IAM user credentials when requested:
    • AWS ID Reach Obt: from your IAM user.
    • Historical Access Key: from your IAM user.
    • The default region: us-east-1 (or the place you like)
    • Default Exit Format: json.
  • Combine AirFlow – Here are the steps:
    • Start AirFlow
    • Create Dag files in Airflow
    • Launch a Web server at (Login: Admin / Admin)
    • Open another signal tab and start the schedule
export AIRFLOW_HOME=$(pwd)/airflow
airflow db init
airflow users create 
  --username admin 
  --password admin 
  --firstname Admin 
  --lastname User 
  --role Admin 
  --email [email protected]
#Initialize Airflow
airflow webserver --port 8080 ##run the webserver
airflow scheduler #start the scheduler

Movement of Development: Data Data Lesson for Covid-19

I use JHUS's Covocical Covial Covial-19 Dataset (CC for 4,0 license) for display purposes. You can refer to the data here,

The chart below shows a job movement from data installation on data upload data in the development tables.

The spill of the development of the writer made by the writer

Data installation

In the first phase of data installation in AWS S3, I process data by melting in a long format and converted the Day format. Maintain the information in pariquet format to improve storage functioning, improve the performance of the question and reduce the final cost. This step code is less than:

import pandas as pd
from datetime import datetime
import os
import boto3
import sys

def process_covid_data():
    try:
        # Load raw data
        url = "
        df = pd.read_csv(url)
        
        # --- Data Processing ---
        # 1. Melt to long format
        df = df.melt(
            id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], 
            var_name='date_str',
            value_name='confirmed_cases'
        )
        
        # 2. Convert dates (JHU format: MM/DD/YY)
        df['date'] = pd.to_datetime(
            df['date_str'], 
            format='%m/%d/%y',
            errors='coerce'
        ).dropna()
        
        # 3. Save as partitioned Parquet
        output_dir = "covid_processed"
        df.to_parquet(
            output_dir,
            engine='pyarrow',
            compression='snappy',
            partition_cols=['date']
        )
        
        # 4. Upload to S3
        s3 = boto3.client('s3')
        total_files = 0
        
        for root, _, files in os.walk(output_dir):
            for file in files:
                local_path = os.path.join(root, file)
                s3_path = os.path.join(
                    'raw/covid/',
                    os.path.relpath(local_path, output_dir)
                )
                s3.upload_file(
                    Filename=local_path,
                    Bucket='my-dev-bucket',
                    Key=s3_path
                )
            total_files += len(files)
        
        print(f"Successfully processed and uploaded {total_files} Parquet files")
        print(f"Data covers from {df['date'].min()} to {df['date'].max()}")
        return True

    except Exception as e:
        print(f"Error: {str(e)}", file=sys.stderr)
        return False

if __name__ == "__main__":
    process_covid_data()

After using Python code, you should be able to see PARQUET files in S3 bakers, under the 'Raw / Covid /' folder.

Screenshot about writer

ETL Pipeline Development

The glue is used especially for the development of ETL Pipeline. Although data can be used for data installation or data can be loaded on S3, its energy is found in processing data when it is at the Data Lashes. Here are Pyspark texts to change data:

# transform_covid.py
from awsglue.context import GlueContext
from pyspark.sql.functions import *

glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": ["s3://my-dev-bucket/raw/covid/"]},
    format="parquet"
).toDF()

# Add transformations here
df_transformed = df.withColumn("load_date", current_date())

# Write to processed zone
df_transformed.write.parquet(
    "s3://my-dev-bucket/processed/covid/",
    mode="overwrite"
)
Screenshot about writer

The next step is to load the data to re-load. In Redshift Console, click on “Question editor Q2” on the left and you can edit your SQL code and finish the Redshift copy.

# Create a table covid_data in dev schema
CREATE TABLE dev.covid_data (
    "Province/State" VARCHAR(100),  
    "Country/Region" VARCHAR(100),
    "Lat" FLOAT8,
    "Long" FLOAT8,
    date_str VARCHAR(100),
    confirmed_cases FLOAT8  
)
DISTKEY("Country/Region")   
SORTKEY(date_str);
# COPY data to redshift
COPY dev.covid_data (
    "Province/State",
    "Country/Region",
    "Lat",
    "Long",
    date_str,
    confirmed_cases
)
FROM 's3://my-dev-bucket/processed/covid/'
IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole'
REGION 'your-region'
FORMAT PARQUET;

Then you will see the data successfully loaded on Data Warehouse.

Screenshot about writer

Pipes Automation

The easiest way to change your data pipe to schedule jobs under the Redshift questions editor by creating a stored procedure (I have more input through the SQL database, can refer to this article).

CREATE OR REPLACE PROCEDURE dev.run_covid_etl()
AS $$
BEGIN
  TRUNCATE TABLE dev.covid_data;
  COPY dev.covid_data 
  FROM 's3://simba-dev-bucket/raw/covid'
  IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole'
  REGION 'your-region'
  FORMAT PARQUET;
END;
$$ LANGUAGE plpgsql;
Screenshot about writer

Otherwise, you can run with airflow for scheduled functions.

from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 2
}

with DAG(
    'redshift_etl_dev',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:

    run_etl = RedshiftSQLOperator(
        task_id='run_covid_etl',
        redshift_conn_id='redshift_dev',
        sql='CALL dev.run_covid_etl()',
    )

The movement of the production work

Airflow Dag is powerful to organize your Pipeline all ETLs if there are many axlesies and is a good practice in the production environment.

After improving and testing your ETL pipe, you can use your jobs in a manufacturing area using Airflow.

The Movement of a Manufacturing Work made by the writer

Here is a checklist for key preparation steps to help the successful shipment in Airflow:

  • Create S3 bucket my-prod-bucket
  • Create a glue function prod_covid_transformation In AWS Console
  • Create a Redshift Database prod.load_covid_data()
  • Prepare Airflow
  • Prepare SMTP for e-mail to airflow.cfg

Then the shipment of the Airflow Data Pipeline is:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.operators.email import EmailOperator

# 1. DAG CONFIGURATION
default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 1)
}

# 2. DATA INGESTION FUNCTION
def load_covid_data():
    import pandas as pd
    import boto3
    
    url = "
    df = pd.read_csv(url)

    df = df.melt(
        id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], 
        var_name='date_str',
        value_name='confirmed_cases'
    )
    df['date'] = pd.to_datetime(df['date_str'], format='%m/%d/%y')
    
    df.to_parquet(
        's3://my-prod-bucket/raw/covid/',
        engine='pyarrow',
        partition_cols=['date']
    )

# 3. DAG DEFINITION
with DAG(
    'covid_etl',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:

    # Task 1: Ingest Data
    ingest = PythonOperator(
        task_id='ingest_data',
        python_callable=load_covid_data
    )

    # Task 2: Transform with Glue
    transform = GlueJobOperator(
        task_id='transform_data',
        job_name='prod_covid_transformation',
        script_args={
            '--input_path': 's3://my-prod-bucket/raw/covid/',
            '--output_path': 's3://my-prod-bucket/processed/covid/'
        }
    )

    # Task 3: Load to Redshift
    load = RedshiftSQLOperator(
        task_id='load_data',
        sql="CALL prod.load_covid_data()"
    )

    # Task 4: Notifications
    notify = EmailOperator(
        task_id='send_email',
        to='you-email-address',
        subject='ETL Status: {{ ds }}',
        html_content='ETL job completed: View Logs'
    )

My last thoughts

Although some users, especially those young people in the cloud and search for simple solutions are often not disturbed by the higher AWS obstacle in the entry and frustration about the greatest choice of services, appropriate and know the reasons:

  • The Configuration process, as well as construction, construction and testing of data pipeline gives you the intense understanding of the traveling work for normal-time engineering work. Skills that will help you even produce your projects and other cloud services, such as Azure, GCP and Alabana Cloud.
  • The ripe ecosystem that the AWS have plans and massage of services that provide users so that they can customize their facilities and enjoy their flexibility and their projects.

Thank you for reading! I hope this article is useful to build your Cloud-Base Data pipe!

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button