My first ETL pipeline
I have always wanted to understand the process in more depth when it has come to data pipelines. Therefore I decdided to build my own, this is that pipeline
Objective
Create a data pipeline that ingests user data via an API, processes and stores it, and then retrieves it in a serialized format.
Components
- Data Source: Random API for fake user data
- Python & Pandas: For programming and data manipulation.
- Redis: Caching recent data for quick access.
- Postgres: Long-term data storage.
- FastAPI For an API endpoint for data retrieval
- Docker: Containerization of the entire pipeline.
Steps
- Data Ingestion:
- Python script to fetch data random user data from an API.
- Validate the data before processing.
- Pandas for data cleaning and transformation.
- Caching Layer:
- Redis setup for caching recent User data and set a TTL.
- Python logic for data retrieval from Redis and Postgres.
- Data Storage:
- Design and implement a Postgres database schema for the user data.
- Make sure PII is hashed before putting into storage
- Store processed data into Postgres.
- Data Retrieval:
- API endpoint (e.g., using FastAPI) for data retrieval.
- Dockerization:
- Dockerfile for the Python application.
- Docker Compose for orchestrating Redis and Postgres services.
- Testing and Deployment:
- Unit tests for pipeline components.
Learning Outcomes
My first learning here was how to structure a project correctly, I normally just dumped my main files in root
but here I created a proper structure that ultimately helped in the long run.
In this project it was the first time I have used GitHub Actions which served again very helpful when making pull requests.
The structure of my pipeline changed a few times, I had every action in it's own module which at the time I thought was the best way. After some advice from better developers this could have been a pain in the ass to maintain if my pipeline was ever to grow in complexity. I consolidated the E / T processes into one file, and kept the L process in it's own module for ease.
I also learnt about salting in this process too, when it came to hashing PII data this was how I was salting my PII data
import hashlib
import random
import string
import struct
def hash_pii(pii: str) -> str:
"""
Function for Salting PII information
Hashes PII with salting
Args:
pii: string of sensitive information to be hashed
Returns:
Hashed PII information ready for storage
Raises:
ValueError: If the input is not a valid string or is empty.
"""
if not isinstance(pii, str) or not pii:
raise ValueError("Invalid input: PII must be a non-empty string.")
hash_object = hashlib.sha512(
(pii + "".join(random.choices(string.ascii_letters, k=30))).encode("ascii")
).digest()
number = struct.unpack(">Q", b"\x00" + hash_object[:7])[0]
return str(number)
How to test the project
Clone the repo
git clone https://github.com/mrpbennett/etl-pipeline.git
cd
into the cloned repo and run docker compose up
docker compose up
Then head over to the URL to access the front end to see where the data is stored
http://120.0.0.1:5173