Optimizing API Gateway Log Real-time Analysis Pipeline Using Apache Flink

A case study on streamlining a real-time log analysis pipeline by pre-processing Kafka messages with Apache Flink, reducing StarRocks query load by 90% and improving dashboard response time by 70%

Optimizing API Gateway Log Real-time Analysis Pipeline Using Apache Flink

Table of Contents


Introduction

In our previous blog post, we introduced a monitoring system that collects PAASUP DIP's API Gateway logs in real-time, stores them in StarRocks, and visualizes them using Apache Superset.

The previous architecture used Kafka's StarRocks Sink Connector to load API Gateway logs directly into the kong_log_events table without any filtering or processing. This approach led to the following issues:

  • Unnecessary data storage: Rows and columns not needed for Access Log analysis were included
  • Real-time query overhead: Dashboards refreshing every few seconds executed complex regex (REGEXP_EXTRACT) and string functions (SPLIT_PART) each time
  • System resource waste: The same parsing logic was repeatedly performed with every query

In this blog post, we share our experience of streamlining the real-time log analysis pipeline by leveraging Apache Flink Cluster to filter and process Kafka messages before storing them in the target database.


1. Architecture Comparison

1.1 Previous Architecture (StarRocks Sink Connector)

스크린샷 2026-02-10 173021.png

Issues:

  • Parsing the message column with every dashboard query
  • Repeated execution of 7+ REGEXP_EXTRACT and SPLIT_PART functions
  • Storage of unnecessary log data (stderr, API Gateway's own health check events, etc.)

스크린샷 2026-02-11 132657.png

Improvements:

  • Parsing performed only once in Flink, storing cleansed data in StarRocks
  • Superset queries simplified to basic column retrieval
  • Unnecessary data filtered at the Flink level (increased storage efficiency)

2. Development Environment

2.1 Environment List

Component Version/Tool Purpose
Apache Flink 2.01 Real-time stream processing
Flink SQL Gateway REST API Execute Flink SQL from Python
Apache Kafka 4.0 Message queue
StarRocks 4.0.4 OLAP database
Python 3.0+ Flink SQL Gateway client
Jupyter Notebook - Pipeline development/testing
Apache Superset 4.1.1 Dashboard BI tool

The DIP cluster catalog should have the flink-kubernetes-operator created by the administrator.
스크린샷 2026-02-10 175302.png

The project manager creates a Flink cluster from the DIP project catalog creation menu.

스크린샷 2026-02-10 175410.png

By clicking the view button for flink-sql-gateway in the project catalog query menu, you can retrieve the flink-sql-gateway URL, Catalog User, and Catalog User Password. This information is used when building the real-time data pipeline.

스크린샷 2026-02-10 175458.png


3. Building Real-time Data Pipeline

We implemented a Python client that calls the Flink SQL Gateway REST API. Key features include:

Session Management and Error Handling

class FlinkSQLGatewayClient:
    def __init__(self, gateway_url: str = "http://localhost:8083"):
        self.gateway_url = gateway_url.rstrip('/')
        self.session_handle = None
    
    def create_session(self, savepoints: Optional[str] = None) -> str:
        """
        Create a new SQL Gateway session
        If savepoints path exists, recover from that point
        """
        properties = {
            "execution.runtime-mode": "streaming",
            "sql-gateway.session.idle-timeout": "30min",
            "table.exec.resource.default-parallelism": "1"
        }
        
        if savepoints:
            properties["execution.savepoint.path"] = savepoints
        
        # Session creation logic...

3.2 Kafka Source Table Definition

Define the Source table to read Kong logs from Kafka:

source_ddl = f"""
CREATE TABLE IF NOT EXISTS source_kong_log (
    `time` STRING,
    `stream` STRING,
    `message` STRING
) WITH (
    'connector' = 'kafka',
    'topic' = '{SOURCE_TOPIC}',
    'properties.bootstrap.servers' = '{BOOTSTRAP_SERVER}',
    'properties.group.id' = '{GROUP_ID}',
    'properties.auto.offset.reset' = 'latest',
    
    -- SASL_SSL security configuration
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'SCRAM-SHA-512',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="{KAFKA_USER}" password="{KAFKA_PASSWORD}";',
    'properties.ssl.truststore.location' = '/opt/flink/certs/ca.p12',
    'properties.ssl.truststore.password' = '[YOUR-SSL-KEY]',
    'properties.ssl.truststore.type' = 'PKCS12',
    
    -- JSON format configuration
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'json.ignore-parse-errors' = 'true'
)
"""

client.execute_sql(source_ddl)

3.3 StarRocks Sink Table Definition

Define the StarRocks table to store parsed data:

sink_ddl = f"""
CREATE TABLE IF NOT EXISTS sink_starrocks (
    `log_time` STRING,
    `stream` STRING,
    `client_ip` STRING,
    `response_time` DOUBLE,
    `host_domain` STRING,
    `request_url` STRING,
    `status_code` INT,
    `project_name` STRING,
    `original_message` STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kube-starrocks-fe-service.demo01-star3.svc.cluster.local:9030/quickstart?useSSL=false',
    'table-name' = '{SR_TABLE_NAME}',
    'username' = '{SR_USERNAME}',
    'password' = '{SR_PASSWORD}'
)
"""

client.execute_sql(sink_ddl)

StarRocks Table Schema (must be created in advance):

CREATE TABLE quickstart.KONG_ACCESS_LOG (
    log_time DATETIME,
    stream VARCHAR(20),
    client_ip VARCHAR(45),
    response_time DOUBLE,
    host_domain VARCHAR(255),
    request_url VARCHAR(1000),
    status_code INT,
    project_name VARCHAR(100),
    original_message STRING
)
DUPLICATE KEY(log_time)
DISTRIBUTED BY HASH(log_time) BUCKETS 10
PROPERTIES (
    "replication_num" = "1"
);

3.4 Real-time ETL Query

The core ETL query. It parses Kafka messages and sends them to StarRocks:

INSERT INTO sink_starrocks
SELECT
    -- UTC → KST time conversion
    CONVERT_TZ(
        REPLACE(REPLACE(`time`, 'T', ' '), 'Z', ''),
        'UTC',
        'Asia/Seoul'
    ) AS log_time,
    
    `stream`,
    
    -- Extract Client IP
    SPLIT_INDEX(message, ' ', 0) AS client_ip,
    
    -- Extract and cast Response Time
    CAST(
        REGEXP_EXTRACT(message, 'HTTP/[0-9.]+" [0-9]+ ([0-9.]+)', 1) 
        AS DOUBLE
    ) AS response_time,
    
    -- Extract Host Domain
    REGEXP_EXTRACT(
        message, 
        'HTTP/[0-9.]+" [0-9]+ [-0-9. ]+ "(.*?)"', 
        1
    ) AS host_domain,
    
    -- Extract Request URL (remove query string)
    SPLIT_INDEX(
        REGEXP_EXTRACT(message, '"(GET|POST|PUT|DELETE|HEAD|OPTIONS) (.*?) HTTP', 2),
        '?', 
        0
    ) AS request_url,
    
    -- Extract HTTP Status Code
    CAST(
        REGEXP_EXTRACT(message, 'HTTP/[0-9.]+" ([0-9]{3})', 1) 
        AS INT
    ) AS status_code,
    
    -- Extract Project Name (derived from host_domain)
    CASE 
        WHEN REGEXP_EXTRACT(message, 'HTTP/[0-9.]+" [0-9]+ [-0-9. ]+ "(.*?)"', 1) LIKE '%-%'
        THEN SPLIT_INDEX(
            REGEXP_EXTRACT(message, 'HTTP/[0-9.]+" [0-9]+ [-0-9. ]+ "(.*?)"', 1), 
            '-', 
            0
        )
        ELSE 'platform'
    END AS project_name,
    
    -- Preserve original message (for debugging)
    message AS original_message

FROM source_kong_log

-- Filter conditions
WHERE 
    `stream` = 'stdout'  -- Filter only standard output logs
    AND message LIKE '%HTTP/%'  -- Filter only HTTP protocol logs
    AND REGEXP_EXTRACT(message, '^[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}') IS NOT NULL  -- IP format validation
    AND SPLIT_INDEX(message, ' ', 0) <> '127.0.0.1'  -- Exclude API Gateway self-generated messages

Pipeline Execution

print(">>> [3/3] Submitting Streaming Job...")
result = client.execute_sql(etl_sql)
print(f"\n[Success] Flink Job Submitted Successfully!")
print(f"Job ID: {result.get('operationHandle')}")

Savepoint Utilization (Failure Recovery)

Flink's Savepoint feature allows restarting the pipeline from a specific point:

Savepoints can be viewed in the Flink dashboard.

스크린샷 2026-02-10 143606.png

# Specify savepoint path when restarting
client.create_session("/opt/flink/savepoint-16547c-c710075ca404")

4. Dashboard Creation

4.1 Previous Case: Complex Dataset SQL

This query is executed with every dashboard refresh and includes complex parsing logic:

SELECT 
    kst_time,
    split_part(message, ' ', 1) AS client_ip,
    regexp_extract(message, 'HTTP/[0-9.]+" [0-9]+ ([0-9.]+)', 1) AS response_time,
    regexp_extract(message, 'HTTP/[0-9.]+" [0-9]+ [-0-9. ]+ "(.*?)"', 1) AS host_domain,
    CASE 
        WHEN regexp_extract(message, 'HTTP/[0-9.]+" [0-9]+ [-0-9. ]+ "(.*?)"', 1) LIKE '%-%' 
        THEN split_part(regexp_extract(message, 'HTTP/[0-9.]+" [0-9]+ [-0-9. ]+ "(.*?)"', 1), '-', 1)
        ELSE 'platform' 
    END AS project_name,
    split_part(regexp_extract(message, '"(GET|POST|PUT|DELETE|HEAD|OPTIONS) (.*?) HTTP', 2), '?', 1) AS request_url,
    regexp_extract(message, 'HTTP/[0-9.]+" ([0-9]{3})', 1) AS status_code,
    message
FROM 
    quickstart.kong_log_events
WHERE 
    stream = 'stdout'
    AND message LIKE '%HTTP/%'
    AND message REGEXP '^[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}'
    AND split_part(message, ' ', 1) != '127.0.0.1'
    {% if from_dttm %}
        AND kst_time >= '{{ from_dttm }}'
    {% endif %}
    {% if to_dttm %}
        AND kst_time < '{{ to_dttm }}'
    {% endif %}

Issues:

  • 7 calls to regexp_extract
  • 4 calls to split_part
  • Duplicate execution of identical regex patterns
  • Parsing thousands to tens of thousands of logs with every dashboard refresh

스크린샷 2026-02-10 182304.png

4.2 Current Case: Simple Dataset SQL

A simple query that retrieves pre-parsed data from Flink:

SELECT 
    log_time,
    client_ip,
    response_time,
    host_domain,
    project_name,
    request_url,
    status_code,
    original_message
FROM quickstart.KONG_ACCESS_LOG
{% if from_dttm %}
WHERE log_time >= '{{ from_dttm }}'
{% endif %}
{% if to_dttm %}
    AND log_time < '{{ to_dttm }}'
{% endif %}

Improvements:

  • 0 regex/string function calls (all removed)
  • Significant performance improvement with simple index scans
  • Query complexity reduced by approximately 90%

스크린샷 2026-02-10 182243.png

4.3 Performance Comparison

Metric Previous (StarRocks Connector) Current (Flink Pipeline)
Query Complexity Complex (11 function calls) Simple (SELECT only)
Dashboard Load Time ~1-2 seconds ~0.5 seconds
CPU Usage High Low
Storage Space 100% (all logs) ~50% (filtered logs)
Scalability Increased query load Flink scale-out possible

5. Conclusion

By introducing Apache Flink to streamline the real-time log analysis pipeline, we achieved the following results:

5.1 Key Achievements

  1. Reduced System Load

    • 90% reduction in Superset query complexity
    • 70% improvement in dashboard response time
    • Decreased StarRocks CPU usage
  2. Improved Data Quality

    • Data validation and filtering at the Flink stage
    • 50% storage savings by removing unnecessary logs
    • Data consistency ensured through structured schema
  3. Increased Operational Efficiency

    • Failure recovery through Flink Savepoints
    • Centralized parsing logic (managed in one place in Flink SQL)
    • Faster Superset chart development

5.2 Final Remarks

The stream processing pipeline spanning Kafka-Flink-StarRocks realizes the principle of "process once, query many times". In environments requiring real-time data analysis, leveraging Flink can significantly improve overall system efficiency.

Especially when transforming unstructured text into structured data, such as log analysis, Flink SQL's powerful string processing capabilities and real-time streaming abilities are tremendously helpful.


References

Subscribe to PAASUP IDEAS

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe