Helpful Practical Python One-Liners of Data Engineering


Photo for Editor | Chatgt
Obvious Introduction
Data engineering includes processing large information, creating ETL pipes, and maintaining data quality. Data engineers work with streaming data, monitoring system performance, manage schema change, and ensure transformation in distribution systems.
Python One-Liners can help improve the activities by allowing complex tasks into single, readable statements. This article focuses on active liners solving regular engineering data.
One liners presented here deals with the real functions such as the data data processing of various agencies, analyzing operating system systems, and managing API answers with data quality checks. Let's get started.
🔗 Link to the code in Githubub
Obvious Sample data
Let's throw some sample data to run our liners to:
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
# Create streaming event data
np.random.seed(42)
events = []
for i in range(1000):
properties = {
'device_type': np.random.choice(['mobile', 'desktop', 'tablet']),
'page_path': np.random.choice(['/home', '/products', '/checkout']),
'session_length': np.random.randint(60, 3600)
}
if np.random.random() > 0.7:
properties['purchase_value'] = round(np.random.uniform(20, 300), 2)
event = {
'event_id': f'evt_{i}',
'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
'user_id': f'user_{np.random.randint(100, 999)}',
'event_type': np.random.choice(['view', 'click', 'purchase']),
'metadata': json.dumps(properties)
}
events.append(event)
# Create database performance logs
db_logs = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=5000, freq='1min'),
'operation': np.random.choice(['SELECT', 'INSERT', 'UPDATE'], 5000, p=[0.7, 0.2, 0.1]),
'duration_ms': np.random.lognormal(mean=4, sigma=1, size=5000),
'table_name': np.random.choice(['users', 'orders', 'products'], 5000),
'rows_processed': np.random.poisson(lam=25, size=5000),
'connection_id': np.random.randint(1, 20, 5000)
})
# Create API log data
api_logs = []
for i in range(800):
log_entry = {
'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
'endpoint': np.random.choice(['/api/users', '/api/orders', '/api/metrics']),
'status_code': np.random.choice([200, 400, 500], p=[0.8, 0.15, 0.05]),
'response_time': np.random.exponential(150)
}
if log_entry['status_code'] == 200:
log_entry['payload_size'] = np.random.randint(100, 5000)
api_logs.append(log_entry)
Obvious 1. To remove JSON fields in the Dataphame columns
Convert JSON Metadata fields from ceremony to the event into different datafemes columns for analysis.
events_df = pd.DataFrame([{**event, **json.loads(event['metadata'])} for event in events]).drop('metadata', axis=1)
This one liner uses a list understanding in the dictionary to arouse each event's support fields with its Metadata. This page drop() Removes real metadata Column since the content is now in different columns.
Which is output:

This creates a dataframe with 1000 lines and 8 columns, where JSON fields are like JSON fields device_type including purchase_value becomes some columns can be asked and compact directly.
Obvious 2. To identify operating workers in type
Find data activities that take longer compared to the same functionality.
outliers = db_logs.groupby('operation').apply(lambda x: x[x['duration_ms'] > x['duration_ms'].quantile(0.95)]).reset_index(drop=True)
These database database with the type of app, and filter each group of records exceed 95th Percentile.
Reduced release:

This returns approximately 250 (5% of 5000 value) when each performance is gradually performed than 95% of the same jobs.
Obvious 3. Counts Common API EDPOINT response times
Monitor the tendency to work over time the API endpoints using slippery windows.
api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')['response_time'].rolling('1H').mean().reset_index()
This changes API logs for data, sets timestamp As an indicator of time-based work, and the types of types in order to ensure the monotonic order. Then they were groups endpoint and uses 1-Hour window in times.
Within each window in slid, mean() The work counts the medium-time period. The roller window goes later, providing an analysis of performance.
Reduced release:

We find replying the time to show that each of the operating API Endpoint changes over time, in Milology prices. High amounts show slow performance.
Obvious 4. Finding SCHEMA change in event data
Point to where new fields come from the service memo in previous events.
schema_evolution = pd.DataFrame([{k: type(v).__name__ for k, v in json.loads(event['metadata']).items()} for event in events]).fillna('missing').nunique()
This Jon Metadata deviates each event and creates Dictionary Map Map Map Map Map Names on the Nthon Names Using type(v).__name__.
The dataframe that appears has one line at each event and one column for a unique campground found in all events. This page fillna('missing') Treating events incoming certain fields, and nunique() Spot how many different values ​​(including missing) They come from each column.
Which is output:
device_type 1
page_path 1
session_length 1
purchase_value 2
dtype: int64
Obvious 5. Compiling data of data connection level
Create summary statistics based on the type and testing of resources.
connection_perf = db_logs.groupby(['operation', 'connection_id']).agg({'duration_ms': ['mean', 'count'], 'rows_processed': ['sum', 'mean']}).round(2)
These database database in the type of operation and the connection ID at the same time, created higher analysis of how different communication treats various functions.
This page agg() Work is working many tasks of combination: mean including count Time to demonstrate the average location of the normal and the frequency of question, while sum including mean A Members rows_processed Show patterns of passing. This page round(2) It guarantees the accuracy of the well-readable desk.
Which is output:

This creates a dataframe with many indicators that show how each communication does different tasks.
Obvious 6. To produce a half hour distribution patterns
Count the event distribution patterns in all different hours to understand user behavior.
hourly_patterns = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby(['hour', 'event_type']).size().unstack(fill_value=0).div(pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').size(), axis=0).round(3)
This issue an hour from Timestamp using assign() and lambda, then creates hours disconnection of the comparisons of events using groupby including unstack.
This page div() General performance with perfect events in an hour to indicate equivalent distribution rather than green counts.
Reduced release:

Returns matrix to show part of each type of event (view, click, purchase) Each hour of the day, expressing the use of users' behavior and high work hours of different deeds.
Obvious 7. Count the API Eley Right Summary with Status Code
Maka Health Health by analyzing errors that spread patterns to all endpoints.
error_breakdown = pd.DataFrame(api_logs).groupby(['endpoint', 'status_code']).size().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').size(), axis=0).round(3)
These APIs entered both endpoint including status_codeThen use size() counting the occurrence and unstack() In the Pivot status codes in columns. This page div() Working is normal for each endpoints to show dimensions than green counts, reveal what ways has higher mistakes and which types of mistakes produces.
Which is output:
status_code 200 400 500
endpoint
/api/metrics 0.789 0.151 0.060
/api/orders 0.827 0.140 0.033
/api/users 0.772 0.167 0.061
It creates a matrix that shows the average of each code (200, 400, 500) per endpoint, making it easy to see smaller mistakes and that of client errors (5xx).
Obvious 8
Find unusual patterns by comparing current performance in the latest historical performance.
anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x['duration_ms'].rolling(window=100, min_periods=10).mean()).assign(is_anomaly=lambda x: x['duration_ms'] > 2 * x['rolling_mean'])
This type comes in chronological order, lists 100 active implementation words rolling()and then the performance of flags where the current period passes twice the rolling rating. This page min_periods=10 It confirms the calculation first after enough data.
Reduced release:

Adds Anomal flags to the operation of each data, to identify slow tasks compared to the latest performance rather than the media use.
Obvious 9. Making well types of data-working data
Effectively do the use of data memory with Dowcast
optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs[c], downcast="integer") if pd.api.types.is_integer_dtype(db_logs[c]) else pd.to_numeric(db_logs[c], downcast="float")) for c in db_logs.select_dtypes(include=['int', 'float']).columns})
This only selects number columns only and replacing them db_logs and lower types use pd.to_numeric(). Columns have detailed, trying int8, int16besides int32 Before living int64. Floating columns, trying float32 front float64.
Doing this reduces the use of memory in large datasets.
Obvious 10. Count an hour to process the hour event
Monitor the spread of the pipeline by tracking the event volume and user's involvement patterns.
pipeline_metrics = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').agg({'event_id': 'count', 'user_id': 'nunique', 'event_type': lambda x: (x == 'purchase').mean()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).round(3)
This issue an hour from Timestamp and groups of events for an hour, and calculated three key metrics: the total number of event count()different users using nunique()and bought a converting rate using a lambda counting part of the purchase incidents. This page rename() The method provides for the descriptive column names of the last issue.
Which is output:

This shows the metric metrics showing an event's volume, inclusion levels, and conversion standards throughout the day.
Obvious Rolling up
These single letters are useful in the activities of data engineering. They include Pandas activities, math analysis, and data change strategies to manage real world conditions.
Each pattern can be changed and extended based on certain requirements while maintaining the basic logic that effects the use of production.
Codes?
Count Priya c He is the writer and a technical writer from India. He likes to work in mathematical communication, data science and content creation. His areas of interest and professionals includes deliefs, data science and natural language. She enjoys reading, writing, codes, and coffee! Currently, he works by reading and sharing his knowledge and engineering society by disciples of teaching, how they guide, pieces of ideas, and more. Calculate and create views of the resources and instruction of codes.



