Modern Data Pipeline Implementation with PaasupDIP: Subway User Statistics Analysis Project #1

This blog introduces a project using the PAASUP DIP to implement a modern data pipeline that processes and analyzes over 10 years of Seoul subway ridership data through Apache Spark, Delta Lake, and PostgreSQL, combined with Apache Superset dashboards for multi-dimensional data visualization.

Seoul Subway Usage Analysis Project

Introduction

In the first part of this series , the subway of modern cities is one of the most important public transportation systems. By analyzing the usage patterns of subways used by countless citizens daily, we can understand the flow of the city and citizens' lifestyle patterns.

This project aims to collect and analyze subway ridership data by line and day to derive meaningful insights. Through this analysis, we will provide results that can be utilized in various fields such as transportation policy development, commercial area analysis, and urban planning.

1. Project Overview
2. Data Visualization
3. Project Environment Setup
4. Data Pipeline Design and Implementation
5. Project Results and Implications

Project Overview

Objective: Building a comprehensive analysis dashboard utilizing subway ridership statistics

Data Sources

  • Subway Ridership Statistics (10 years and 4 months of data, Source: Public Data Portal)
  • Subway Station Information (Source: Public Data Portal)

Technology Stack:

  • Data Processing: Apache Spark
  • Data Storage: Delta Lake, PostgreSQL
  • Visualization: Apache Superset
  • Development Environment: Jupyter Notebook
  • Platform: DIP Portal

Data Visualization

Data visualization is a key element that helps us intuitively understand complex data. In this project, we implemented 3 dashboards using Apache Superset.

Station Ridership Trends

Key Features:

  • Time Series Analysis: Monthly ridership trend visualization
  • Dynamic Filters: Dynamic filtering by line and station
  • Interaction: Detailed analysis through drill-down

Key Insights:

  • Sharp decline in ridership from early 2020 due to COVID-19 impact, with low ridership continuing until 2022
  • Gradual increase in ridership from late 2022, with stable usage patterns established after 2023
  • Confirmed seasonal characteristics with regular minor fluctuations

Dashboard 2: Daily Average Ridership by Line

Daily Average Ridership by Line

Key Features:

  • Density Analysis: Overview of ridership density by line
  • Dynamic Filters: Filtering by line and usage month
  • Ranking Analysis: Dynamic station ranking sorting capability

Key Insights:

  • Confirmed downtown-concentrated transportation usage patterns such as Seoul Station on Line 1 and Gangnam Station on Line 2
  • Clear ridership gap between major transfer stations and regular stations
  • Confirmed normal subway usage patterns with similar boarding and alighting passenger numbers at most stations

Dashboard 3: Ridership by Day of Week

Ridership by Day of Week

Key Features:

  • Density Analysis: Overview of ridership ratios by day of week, by line and station
  • Dynamic Filters: Filtering by line and usage year
  • Ranking Analysis: Sorting by day of week ranking
  • Other: Time-based analysis would be possible if raw data included usage hours, but only daily data is available, making hourly analysis impossible

Key Insights:

  • Friday shows the highest usage rate among weekdays, with other weekdays showing even distribution (TGIF phenomenon)
  • Weekday vs Weekend gap: Weekend (Saturday/Sunday) usage rates are significantly lower than weekdays, confirming commuting/school-centered usage patterns
  • Minimal Monday blues effect: Similar levels to Tuesday

Project Environment Setup

This project can build all necessary environments with simple operations through the DIP (Data Intelligence Platform) portal.

Required Service Catalog

  • Kubeflow: Workflow management linked with Jupyter Notebook
  • PostgreSQL: Data storage and management
  • Apache Superset: Data visualization and dashboard

Step-by-Step Environment Setup Guide

Step 1: Login and Project Selection

DIP administrator first creates the project and manager account. After logging in with the created account, view the project list assigned to the manager and select the catalog creation menu.
01-DIP login
02-create-catalog

Step 2: Kubeflow Catalog Creation

  • Select Kubeflow from the catalog creation menu
  • Enter catalog version and name
  • Click create button to generate catalog
    03-kubeflow-create

Step 3: PostgreSQL Catalog Creation

  • Select PostgreSQL catalog
  • Enter catalog version and name
  • Click create button to build database environment
    04-create-postgres

Step 4: Superset Catalog Creation

  • Select Apache Superset catalog
  • Enter catalog version and name
  • Click create button to build visualization environment
    05-superset-create

Step 5: Catalog Status Verification

In the catalog inquiry screen, you can monitor the deployment status and service status of each catalog created by project unit in real-time.
06-catalog-list

Step 6: PostgreSQL Connection Information Verification

The pg-host and pg-port information for accessing PostgreSQL from applications can be found in the detailed information screen of the corresponding catalog.
07-pg-host-info

Step 7: Jupyter Notebook Environment Creation

To build a JupyterLab environment for Python code development, create a notebook instance through the Kubeflow service link.
08-create-notebook
09-create-notebook2

Step 8: Development Environment Access

Clicking the Connect link in Jupyter Notebook provides access to an environment where you can develop necessary applications using Python code.
10-connect-jupyter
11-jupyterlab

Data Pipeline Design and Implementation

Step 1: Data Collection and Preprocessing

The raw data in CSV format collected from the Public Data Portal had the following characteristics:

  • Data Scale: Daily aggregated data for 10 years and 4 months
  • Source Data Information:
    1. Boarding/alighting passenger information by line/station/day (CARD_SUBWAY_MONTH_{WK_YM}.csv) - all files
      subway_data
    2. Subway station information (Seoul Transportation Corporation_Subway Station Information by Line.csv) - 1 final file
      subway_station
  • Data Quality Issues:
    1. Inconsistent line names and station names between the two source datasets
    2. Occasional Korean character corruption: resolved by saving as UTF-8 CSV in Excel
    3. Spark errors when special characters exist in the first row (labels): resolved by removing special characters before saving

Step 2: PostgreSQL Data Mart Model Creation

Configured optimized PostgreSQL data model for analysis:

-- Create subway passenger information table
CREATE TABLE subway_passengers (
    use_date varchar(8) not null, -- Usage date
    line_no varchar(100) not NULL, -- Line name
    station_name varchar(100) not NULL, -- Station name
    pass_in numeric(13) NULL, -- Boarding passengers
    pass_out numeric(13) NULL, -- Alighting passengers
    reg_date varchar(8) null -- Registration date
);
create index subway_passengers_idk1 on subway_passengers (use_date, line_no, station_name);

-- Create subway station information table
CREATE TABLE subway_info (
    station_name varchar(100) not NULL, -- Station name
    line_no varchar(100) not NULL, -- Line name
    station_cd varchar(10), -- Station code (for sorting)
    station_cd_external varchar(10), -- Station code (external)
    station_ename varchar(100), -- Station name (English)
    station_cname varchar(100), -- Station name (Chinese)
    station_jname varchar(100) -- Station name (Japanese)
);
create index subway_info_idk1 on subway_info (station_name, line_no, station_cd);

Step 3: Spark-based Data Transformation (CSV → Delta → PostgreSQL)

Using DIP's Spark cluster, we performed the following data transformation tasks:

load_subway_passengers.ipynb: Subway passenger information data transformation

  1. Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import lit, substring, col
from delta.tables import DeltaTable 
import os
from dotenv import load_dotenv
import psycopg2 
import boto3
from botocore.exceptions import ClientError
  1. Spark Session creation function: Common Library
def create_spark_session(app_name, notebook_name, notebook_namespace, s3_access_key, s3_secret_key, s3_endpoint):
    conf = SparkConf()
    # Spark driver, executor configuration
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.instances", "1")
    # Omitted for brevity...
    
    # JAR packages required for S3, Delta, PostgreSQL usage
    jar_list = ",".join([
        "org.apache.hadoop:hadoop-common:3.3.4",
        "org.apache.hadoop:hadoop-aws:3.3.4",
        "com.amazonaws:aws-java-sdk:1.11.655", 
        "io.delta:delta-spark_2.12:3.3.1",
        "org.postgresql:postgresql:42.7.2"
    ])
    conf.set("spark.jars.packages", jar_list)
    # Omitted for brevity...
    
    # Build SparkSession
    spark = SparkSession.builder \
        .appName(app_name) \
        .config(conf=conf) \
        .getOrCreate()    
    
    return spark
  1. Function to save CSV to Delta and move S3 CSV file: Common Library
def move_s3_file(s3_config, source_key, dest_key):
    """
    Move files within S3 using boto3 (copy then delete).
    """
    s3_client = boto3.client(
        's3',
        aws_access_key_id=s3_config['access_key'],
        aws_secret_access_key=s3_config['secret_key'],
        endpoint_url=s3_config['endpoint'],
        verify=False # May be needed when using self-signed certificates
    )
    try:
        copy_source = {'Bucket': s3_config['bucket'], 'Key': source_key}
        print(f"Copying S3 object from {source_key} to {dest_key}...")
        s3_client.copy_object(CopySource=copy_source, Bucket=s3_config['bucket'], Key=dest_key)
        print(f"Deleting source S3 object: {source_key}...")
        s3_client.delete_object(Bucket=s3_config['bucket'], Key=source_key)
        print(f"Successfully moved {source_key} to {dest_key}.")
        return True
    except ClientError as e:
        print(f"Error moving S3 file: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred during S3 move: {e}")
        return False
  1. Main data processing function
    Implemented for incremental loading (Append) and reprocessing capability since data source files (CSV) are monthly units
def subway_data_append(spark, s3_config, pg_config):
    """
    Main pipeline processing CSV --> DELTA --> POSTGRESQL
    """
    pg_url = f"jdbc:postgresql://{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
    pg_properties = {
        "user": pg_config['user'],
        "password": pg_config['password'],
        "driver": "org.postgresql.Driver"
    }

    print(f" bucket name is {s3_config['bucket']}")
    csv_source_key = f"datasource/CARD_SUBWAY_MONTH_{WK_YM}.csv"
    csv_archive_key = f"archive/CARD_SUBWAY_MONTH_{WK_YM}.csv"
    csv_s3_path = f"s3a://{s3_config['bucket']}/{csv_source_key}"
    delta_s3_path = f"s3a://{s3_config['bucket']}/{s3_config['delta_path']}"
    
    try:    
        print(f"\n--- Starting processing for {pg_config['table']} during '{WK_YM}' ---")

        # Step 1: Read S3 CSV
        print(f"Reading CSV from {csv_s3_path}...")
        csv_df = spark.read.option("header", "true").option("overwriteSchema", "true").csv(csv_s3_path)
        csv_df.createOrReplaceTempView("subway_data")
        print(f"CSV for {WK_YM} loaded. Count: {csv_df.count()}")
        csv_df.show(5)

        # Step 2: Delete and insert to Delta Lake (using Spark SQL)
        print(f"Deleting old data for {WK_YM} from Delta table: {delta_s3_path}")
        
        if DeltaTable.isDeltaTable(spark, delta_s3_path):
            print(f" Delta Table {delta_s3_path} exists") 
            delete_delta_sql = f"delete from delta.`{delta_s3_path}` where `사용일자` like '{WK_YM}'||'%' "
            delta_mode = "append"
            try:
                spark.sql(delete_delta_sql)
                print(f"Deletion from Delta complete. Inserting new data for {WK_YM}...")
            except Exception as e:
                print(f"Skip on error occurred during Delta DELETE, even though table exists: {e}")
                raise         
        else:
            print(f" Delta Table {delta_s3_path} does not exist") 
            delta_mode = "overwrite"
        
        # Write using DataFrame API
        csv_df.write.format("delta").mode(delta_mode).save(delta_s3_path)
        print(f"Successfully wrote data to Delta table {delta_s3_path}.")
        
        # Step 3: Move S3 CSV file
        print(f"Archiving S3 CSV file...")
        move_s3_file(s3_config, csv_source_key, csv_archive_key)
        
        # Step 4: Delete and insert to PostgreSQL   
        pg_table_name = pg_config['table']
        print(f"Deleting old data for {WK_YM} from PostgreSQL table: {pg_table_name}")        
        conn = None
        cursor = None
        conn = psycopg2.connect(
            host=pg_config['host'],
            port=pg_config['port'],
            dbname=pg_config['dbname'],
            user=pg_config['user'],
            password=pg_config['password']
        )
        cursor = conn.cursor()

        delete_sql = f" delete from {pg_table_name} where use_date like '{WK_YM}'||'%' "
        print(f"Executing DELETE query: {delete_sql}")
        cursor.execute(delete_sql)
        deleted_rows = cursor.rowcount
        if cursor: cursor.close()
        if conn: 
            conn.commit()
            conn.close()
        print(f"Successfully executed DELETE query. {deleted_rows} rows deleted.")    

        print(f"Inserting data for {WK_YM} into PostgreSQL table: {pg_table_name}")
        # Spark SQL query to process data
        insert_sql = f"""
            select `사용일자` use_date
               ,`노선명` line_no
               ,`역명` station_name
               ,cast(`승차총승객수` as int) pass_in
               ,cast(`하차총승객수` as int) pass_out
               ,substring(`등록일자`,1,8) reg_date
            from delta.`{delta_s3_path}`
            where `사용일자` like '{WK_YM}'||'%'
        """      
        print(insert_sql) 
        delta_df = spark.sql(insert_sql)
        delta_df.write \
            .jdbc(url=pg_url,
                  table=pg_table_name,
                  mode="append",
                  properties=pg_properties)
        print(f"Successfully wrote data to PostgreSQL table {pg_table_name}.")

        print(f"--- Processing for {WK_YM} completed successfully! ---")

    except Exception as e:
        print(f"An error occurred during processing {WK_YM}: {e}")
        import traceback
        traceback.print_exc()
  1. Main execution block
# Main execution block
if __name__ == "__main__":
    # Read environment variables from .env file in Pod     
    dotenv_path = './config/.env'
    load_dotenv(dotenv_path=dotenv_path)    
    
    # APP configuration variables
    APP_NAME = 'load_subway_passengers'
    NOTEBOOK_NAME = "test"
    NOTEBOOK_NAMESPACE = "demo01-kf"    
    S3_CONFIG = {
        "access_key": os.getenv("S3_ACCESS_KEY"),
        "secret_key": os.getenv("S3_SECRET_KEY"),
        "endpoint": os.getenv("S3_ENDPOINT_URL"),
        "bucket": os.getenv("S3_BUCKET_NAME"),
        "delta_path": "deltalake/subway_passengers" # Delta target table path in S3 bucket
    }

    PG_CONFIG = {
        "host": os.getenv("PG_HOST"),
        "port": os.getenv("PG_PORT", "5432"), # Default value 5432
        "dbname": os.getenv("PG_DB"),
        "user": os.getenv("PG_USER"),
        "password": os.getenv("PG_PASSWORD"),
        "table": "subway_passengers" # PostgreSQL target table name
    }
    
    # Create SparkSession
    spark = create_spark_session(
        app_name=APP_NAME,
        notebook_name=NOTEBOOK_NAME,
        notebook_namespace=NOTEBOOK_NAMESPACE,
        s3_access_key=S3_CONFIG['access_key'],
        s3_secret_key=S3_CONFIG['secret_key'],
        s3_endpoint=S3_CONFIG['endpoint']
    )   
    
    # Execute data processing
    subway_data_append(spark, S3_CONFIG, PG_CONFIG)
    
    # Stop SparkSession
    spark.stop()
    print("Spark session stopped.")

load_subway_info.ipynb: Subway station information data transformation

  1. Import necessary libraries: Same as above
  2. Spark Session creation function: Common Library
  3. Function to save CSV to Delta and move S3 CSV file: Common Library
  4. Main data processing function: Overwrite since it's master information
def subway_info_overwrite(spark, s3_config, pg_config): 
    """
    Main pipeline processing CSV --> DELTA --> POSTGRESQL
    """
    pg_url = f"jdbc:postgresql://{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
    pg_properties = {
        "user": pg_config['user'],
        "password": pg_config['password'],
        "driver": "org.postgresql.Driver"
    }

    print(f" bucket name is {s3_config['bucket']}")
    csv_source_key = f"datasource/서울교통공사_노선별 지하철역 정보.csv"
    csv_archive_key = f"archive/서울교통공사_노선별 지하철역 정보.csv"
    csv_s3_path = f"s3a://{s3_config['bucket']}/{csv_source_key}"
    delta_s3_path = f"s3a://{s3_config['bucket']}/{s3_config['delta_path']}"
    
    try:    
        print(f"\n--- Starting processing for {pg_config['table']} ---")

        # Step 1: Read S3 CSV
        print(f"Reading CSV from {csv_s3_path}...")
        csv_df = spark.read.option("header", "true").option("overwriteSchema", "true").csv(csv_s3_path)
        csv_df.createOrReplaceTempView("subway_info")
        csv_df.show(5)
        
        print(f"CSV loaded. Count: {csv_df.count()}")

        # Step 2: Save to Delta Lake (using Spark SQL)
        csv_df.write.format("delta").mode("overwrite").save(delta_s3_path)
        print(f"Successfully wrote data to Delta table {delta_s3_path}.")
        
        # Step 3: Move S3 CSV file
        print(f"Archiving S3 CSV file...")
        move_s3_file(s3_config, csv_source_key, csv_archive_key)       

        # Step 4: Delete and insert to PostgreSQL   
        pg_table_name = pg_config['table']

        print(f"Inserting data into PostgreSQL table: {pg_table_name}")
        # Spark SQL query to process data
        insert_sql = f"""
            select `전철역명` station_name
               ,`전철역코드` station_cd
               ,`전철명명_영문` station_ename
               ,`전철명명_중문` station_cname
               ,`전철명명_일문` station_jname
               ,`외부코드` station_cd_external
               , case when substring(`호선`,1,1) = '0' then substring(`호선`,2,length(`호선`)-1) 
                      when `호선` = '공항철도' then '공항철도 1호선' 
                      when `호선` = '수인분당선' then '수인선'
                      when `호선` = '신분당선' then '분당선'
                      when `호선` = '우이신설경전철' then '우이신설선' 
                      else `호선` end line_no
            from delta.`{delta_s3_path}`
        """      
        print(insert_sql) 
        delta_df = spark.sql(insert_sql)
        delta_df.write \
            .jdbc(url=pg_url,
                  table=pg_table_name,
                  mode="overwrite",
                  properties=pg_properties)
        print(f"Successfully wrote data to PostgreSQL table {pg_table_name}.")

        print("--- Processing completed successfully! ---")

    except Exception as e:
        print(f"An error occurred during processing: {e}")
        import traceback
        traceback.print_exc()
  1. Main execution block
# Main execution block
if __name__ == "__main__":
    # Read environment variables from .env file in Pod     
    dotenv_path = './config/.env'
    load_dotenv(dotenv_path=dotenv_path)    
    
    # APP configuration variables
    APP_NAME = 'load_subway_info'
    NOTEBOOK_NAME = "test"
    NOTEBOOK_NAMESPACE = "demo01-kf"    
    S3_CONFIG = {
        "access_key": os.getenv("S3_ACCESS_KEY"),
        "secret_key": os.getenv("S3_SECRET_KEY"),
        "endpoint": os.getenv("S3_ENDPOINT_URL"),
        "bucket": os.getenv("S3_BUCKET_NAME"),
        "delta_path": "deltalake/subway_info" # Delta target table path in S3 bucket
    }

    PG_CONFIG = {
        "host": os.getenv("PG_HOST"),
        "port": os.getenv("PG_PORT", "5432"), # Default value 5432
        "dbname": os.getenv("PG_DB"),
        "user": os.getenv("PG_USER"),
        "password": os.getenv("PG_PASSWORD"),
        "table": "subway_info" # PostgreSQL target table name
    }
    
    # Create SparkSession
    spark = create_spark_session(
        app_name=APP_NAME,
        notebook_name=NOTEBOOK_NAME,
        notebook_namespace=NOTEBOOK_NAMESPACE,
        s3_access_key=S3_CONFIG['access_key'],
        s3_secret_key=S3_CONFIG['secret_key'],
        s3_endpoint=S3_CONFIG['endpoint']
    )   
    
    # Execute data processing
    subway_info_overwrite(spark, S3_CONFIG, PG_CONFIG)
    
    # Stop SparkSession
    spark.stop()
    print("Spark session stopped.")

Key Cleansing Tasks:

  • Column Name Standardization: Changed Korean labels to English columns when transferring from Delta to PostgreSQL to ensure system compatibility
  • Data Consistency: Resolved inconsistencies between subway ridership data and subway station information data
    • Station name standardization (handling spaces and special characters)
    • Line name unification (consistent notation)
  • Data Type Optimization: Appropriate data type conversion for numeric columns

Project Results and Implications

Key Analysis Results

1. Usage Patterns by Line

  • Lines 2 and 9 show the highest usage rates
  • Lines centered around Gangnam-gu recorded relatively high ridership
  • Steady increase in usage rates for metropolitan lines such as Bundang Line and Gyeongui Line

2. Station-specific Usage Characteristics

  • Major commercial hub stations like Gangnam, Hongik University, and Konkuk University stations recorded the highest usage rates
  • Transfer stations showed 2-3 times higher ridership compared to single-line stations
  • Confirmed stable usage patterns for stations near residential areas

3. Period-wise Trend Analysis

  • Overall 10-15% increase in ridership over the past 3 years
  • Temporary decrease in 2020-2021 due to COVID-19 impact followed by rapid recovery
  • Seasonally higher ridership in spring and fall

4. Day-of-week Usage Characteristics

  • Weekend (Saturday-Sunday) ridership decreased by about 25-30% compared to weekdays (Monday-Friday)
  • Friday recorded the highest usage rate among weekdays, reflecting weekend leisure activity patterns
  • Monday showed slightly lower usage rates compared to other weekdays

Business Insights

Transportation Policy Perspective

  • Need for differentiated operational strategies considering usage rate gaps between lines
  • Prioritization of infrastructure expansion centered on major transfer stations
  • Exploration of flexible train scheduling reflecting day-of-week patterns

Commercial Area Analysis Perspective

  • Evaluation of commercial potential using floating population data by station area
  • Analysis of correlations between day-of-week usage patterns and sales by business type
  • Objective indicators for new store location selection

Urban Planning Perspective

  • Assessment of development potential in areas around high-usage lines
  • Exploration of urban regeneration and revitalization plans for low-usage sections
  • Data-based evidence for determining transportation infrastructure investment priorities

Technical Achievements

Platform Utilization Effects

  • Rapid environment setup through DIP portal (80% time reduction compared to previous methods)
  • Improved operational efficiency with Kubeflow-based automated pipelines
  • Quick monitoring dashboard construction using Superset

Scalability Assurance

  • Foundation for integrated analysis with other transportation data (bus, taxi, etc.)
  • Expandable to real-time streaming data processing architecture
  • Preparation for predictive analysis through machine learning model application

Future Development Directions

Short-term Plans

  • Analysis of commuting patterns through hourly data acquisition
  • Expansion of detailed station-wise analysis and strengthening regional characteristic analysis
  • Addition of detailed seasonal and monthly trend analysis

Medium to Long-term Plans

  • Correlation analysis with external factors such as weather and events
  • Construction of ridership prediction system using AI/ML models
  • Development of policy simulation and decision support systems

Conclusion

Through this project, we confirmed that subway usage data is an important indicator for understanding urban lifestyle patterns and economic activities beyond simple transportation information. By utilizing the integrated environment of the DIP portal, we were able to build an advanced data analysis system while minimizing technical complexity.

These analysis results ultimately provide meaningful data-based insights that can contribute to improving citizens' quality of life and creating a better urban environment.

Subscribe to PAASUP IDEAS

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe