Modern Data Pipeline Implementation with PAASUP DIP: Subway User Statistics Analysis Project #3

Analyzing Urban Transportation Patterns with Seoul Subway Hourly Boarding and Alighting Data

Project Overview

The Seoul subway network is South Korea's largest urban railway system, serving over 7 million citizens daily.

This project implements the 'Rush Hour Pattern Analysis through Hourly Data Acquisition' task that was proposed as a future development plan in a previous blog post.

The goal is to build an intelligent dashboard system that can analyze citizens' movement patterns and traffic congestion by processing over 10 years of subway boarding and alighting data from the Public Data Portal using modern data lake technology stack.

Table of Contents

  1. Original Data Structure
  2. Data Pipeline Construction
  3. Dashboard Development
  4. Business Insights
  5. Conclusion

📋Original Data Structure

  • Filename: Seoul Subway Line-wise Station-wise Hourly Boarding and Alighting Information.json
  • Source: Public Data Portal
  • Scale: 73,692 records
  • Time Range: Monthly/Line-wise/Station-wise hourly (00:00-24:00) boarding and alighting information
    스크린샷 2025-06-19 155737.png

The original JSON data contains hourly boarding/alighting passenger counts for each station over 24 hours, consisting of the following key fields:

{
  "USE_MM": "Usage Month",
  "SBWY_ROUT_LN_NM": "Line Name", 
  "STTN": "Subway Station",
  "HR_7_GET_ON_NOPE": "07:00-08:00 Boarding Passengers",
  "HR_7_GET_OFF_NOPE": "07:00-08:00 Alighting Passengers",
  // ... All 24 hourly time slots
}

⚙️Data Pipeline Construction

Architecture Design

This project implements a modern, scalable, and performance-optimized data lakehouse architecture. We adopted a multi-layered structure that stores original JSON data in cloud storage, converts it to Delta Lake format to support ACID transactions and schema evolution, and finally connects to relational databases and BI tools.

The entire data pipeline (excluding visualization) is implemented through Apache Spark Operator, ensuring stable and scalable data processing in a distributed processing environment.

스크린샷 2025-06-19 165536.png

Technology Stack:

  • Data Processing: Apache Spark
  • Data Storage: Delta Lake, PostgreSQL
  • Visualization: Apache Superset
  • Development Environment: Jupyter Notebook
  • Platform: DIP Portal

Data Normalization Process

We transformed the original JSON's wide format data into long format suitable for analysis:

Before Transformation (Wide Format): JSON file or Delta table schema

Usage Year-Month | Line Name | Subway Station | HR_7_GET_ON | HR_7_GET_OFF | HR_8_GET_ON | HR_8_GET_OFF | ...

After Transformation (Long Format): PostgreSQL table schema

CREATE TABLE subway_passengers_time (
    use_ym VARCHAR(6),           -- Usage Year-Month
    line_no VARCHAR(100),        -- Line Name
    station_name VARCHAR(100),   -- Station Name
    time_term VARCHAR(20),       -- Time Period
    pass_in NUMERIC(12),         -- Boarding Passengers
    pass_out NUMERIC(12),        -- Alighting Passengers
    reg_date VARCHAR(8),         -- Registration Date
    PRIMARY KEY (use_ym, line_no, station_name, time_term)
);

Final Data Scale: 1,768,608 records over 10 years and 4 months

Data Pipeline Implementation

The core code of the data pipeline using Spark is as follows:

def subway_data_time_overwrite(spark, s3_config, pg_config, json_file): 
    """
    Main pipeline for processing subway passenger hourly data
    Processes data in the order: JSON source data → Delta Lake → PostgreSQL
    
    Args:
        spark: Spark session object
        s3_config: S3 configuration (bucket, access_key, secret_key, etc.)
        pg_config: PostgreSQL configuration (host, port, dbname, user, password)
        json_file: JSON filename to process
    """
    
    # Generate PostgreSQL JDBC connection URL
    pg_url = f"jdbc:postgresql://{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
    
    # Set PostgreSQL connection properties
    pg_properties = {
        "user": pg_config['user'],
        "password": pg_config['password'],
        "driver": "org.postgresql.Driver"
    }

    print(f" bucket name is {s3_config['bucket']}")
    
    # Define S3 file paths
    json_source_key = f"datasource/{json_file}"  # Original JSON file path
    json_archive_key = f"archive/{json_file}"    # Archive path after processing
    json_file_path = f"s3a://{s3_config['bucket']}/{json_source_key}"  # Full S3 file path
    
    # Delta Lake table storage path
    delta_table_path = f"s3a://{s3_config['bucket']}/deltalake/subway_passengers_time"
    
    # PostgreSQL table name
    pg_table_name = "subway_passengers_time"
    
    try:    
        # =================================================================
        # Step 1: Read JSON file from S3 and create DataFrame
        # =================================================================
        print(f"Reading JSON file from S3: {json_file_path}...")
        
        # Read as raw text to handle special JSON structure
        # Actual data is stored as an array inside the "DATA" key of JSON
        raw_json_rdd = spark.sparkContext.textFile(json_file_path)
        
        # Combine all lines of RDD into one string
        raw_json_string = "".join(raw_json_rdd.collect())
        
        # Parse JSON string to Python dictionary
        data_dict = json.loads(raw_json_string)
        
        # Extract actual data record list from 'DATA' key
        records = data_dict.get("DATA")
        if not records:
            print("Error: 'DATA' key not found in JSON or is empty.")
            return
        
        # Create Spark DataFrame from extracted record list
        df_json = spark.createDataFrame(records)
        
        # =================================================================
        # Step 2: Save data to Delta Lake (overwrite mode)
        # =================================================================
        print(f"Saving data to Delta Lake table: {delta_table_path}")
        
        # Save data to Delta Lake in overwrite mode
        df_json.write.format("delta").mode("overwrite").save(delta_table_path)
        print("Data saved to Delta Lake table successfully")

       
        # =================================================================
        # Step 3: Move processed JSON file to archive folder
        # =================================================================
        print(f"Moving processed JSON file to archive folder...")
        move_s3_file(s3_config, json_source_key, json_archive_key)

        # =================================================================
        # Step 4: Transform Delta Lake data and load into PostgreSQL
        # =================================================================   
        print(f"Inserting data into PostgreSQL table: {pg_table_name}")
        
        # Implement UNPIVOT operation using UNION ALL to convert hourly columns (hr_0~hr_23) to rows
        # Original data: 1 row with 24 hourly columns
        # After transformation: 24 rows with separate data for each hour
        insert_sql = f"""
                        -- 00:00-01:00 boarding/alighting data
                        select use_mm use_ym               -- Usage Year-Month
                             , sbwy_rout_ln_nm line_no      -- Subway Line Name
                             , sttn station_name            -- Station Name
                             , '00시-01시' time_term        -- Time Period
                             , hr_0_get_on_nope pass_in     -- Boarding Passengers
                             , hr_0_get_off_nope pass_out   -- Alighting Passengers
                             , job_ymd reg_date             -- Registration Date
                          from delta.`{delta_table_path}`
                        union all 
                        -- 01:00-02:00 boarding/alighting data
                        select use_mm use_ym
                             , sbwy_rout_ln_nm line_no
                             , sttn station_name
                             , '01시-02시' time_term
                             , hr_1_get_on_nope pass_in
                             , hr_1_get_off_nope pass_out
                             , job_ymd reg_date
                          from delta.`{delta_table_path}`
                        union all 
                        ...
                    """      
        
        
        # Execute Spark SQL to create transformed DataFrame
        delta_df = spark.sql(insert_sql)
        
        # Write data to PostgreSQL (overwrite mode: delete existing data and load new data)
        delta_df.write \
            .jdbc(url=pg_url,
                  table=pg_table_name,
                  mode="overwrite",
                  properties=pg_properties)
        
        print(f"Data loading to PostgreSQL table {pg_table_name} completed")

        print(f"--- All processing completed successfully! ---")

    except Exception as e:
        print(f"Error occurred during data processing: {e}")
        import traceback
        traceback.print_exc()  # Output detailed error stack trace

Key Implementation Points:

  • Dynamic Column Processing: Dynamically convert 24-hour boarding/alighting data to Long Format
  • Delta Lake Utilization: Support for ACID transactions and schema evolution
  • Distributed Processing: Efficient processing of large volumes of data with Spark's distributed computing
  • Data Quality: Ensure data integrity through deduplication and aggregation

📊Dashboard Development

We built two main dashboards using Apache Superset.

Dashboard 1: Detailed Hourly Congestion Analysis

Objective: Propose optimal travel times through analysis of hourly boarding/alighting patterns for specific periods, lines, and stations

Key Features:

  • Dynamic Filtering: Multiple selection for inquiry period, line names, and station names
  • Real-time Analysis: Immediate chart updates based on user selections

스크린샷 2025-06-19 153039.png

Core Charts:

  1. Hourly Average Boarding/Alighting Trends (Line Chart)

    • Visualize hourly average boarding/alighting patterns for selected conditions
    • Identify rush hours and off-peak periods
      스크린샷 2025-06-19 153540.png
  2. TOP 10 Stations by Boarding/Alighting Volume (Bar Chart)

    • Ranking of the 10 busiest stations under selected conditions
    • Identify major transportation hub stations
      스크린샷 2025-06-19 153654.png
  3. Hourly/Station-wise Congestion Heatmap (Heatmap)

    • At-a-glance view of time-station congestion levels in a 2D matrix for selected station conditions
    • Intuitive representation of congestion levels through color intensity
      스크린샷 2025-06-19 153751.png
  4. Rush Hour Concentration Analysis (Bar Chart)

    • Passenger distribution between rush hours (07-09, 18-20) vs other time periods for selected station conditions
    • Quantitative analysis of traffic concentration
      스크린샷 2025-06-19 153931.png

Dashboard 2: Transfer Station-Centered Floating Population Analysis

Objective: In-depth analysis of major transfer stations' role as transportation hubs and inter-line connectivity

Key Features:

  • Dynamic Filtering: Multiple selection for inquiry period and transfer stations
  • Real-time Analysis: Immediate chart updates based on user selections

스크린샷 2025-06-19 153420.png

Core Insights:

  1. Top 15 Transfer Stations in South Korea (Bar Chart)

    • Major transfer station rankings based on total ridership over the entire period
    • Identify core nodes of national transportation infrastructure
      스크린샷 2025-06-19 154024.png
  2. TOP 10 Transfer Stations by Morning Alighting Volume (Table)

    • Analysis of transfer stations with concentrated alighting during 07-09 hours
    • Identify transfer stations connecting major business districts
      스크린샷 2025-06-19 154057.png
  3. Hourly Net Inflow Population Analysis for Transfer Stations (Line Chart)

    • Net inflow (alighting passengers - boarding passengers) analysis for selected transfer station conditions
      Positive (+): Destination characteristics (business districts)
      Negative (-): Origin characteristics (residential areas)
    • Quantitative analysis of functional characteristics (residential/business) of selected transfer stations
      스크린샷 2025-06-19 154310.png
  4. Line-wise Contribution at Transfer Stations (Pie Chart)

    • Usage proportion of each line within selected transfer stations
    • Distinguish between main lines and auxiliary lines
      스크린샷 2025-06-19 154157.png

📈Business Insights

  1. Rush Hour Concentration Phenomenon

    • About 40% of total passengers concentrated during 07-09 and 18-20 hours
    • Concentrated alighting at business centers like Gangnam Station and Jamsil Station
  2. Transfer Station Hub Effect

    • Top 5 transfer stations handle 25% of total traffic volume
    • Differentiated usage patterns according to line-specific characteristics
  3. Regional Characteristic Analysis

    • Residential areas: Morning boarding, evening alighting concentration
    • Business areas: Morning alighting, evening boarding concentration

🚀Conclusion

Through this project, we confirmed that simple boarding and alighting data can become a powerful tool for understanding citizens' lifestyle patterns and urban structure.

By utilizing modern data pipelines and visualization tools, we were able to maximize the value of public data and derive insights that can improve citizens' transportation convenience.