Skip to content

Better integrate Spark SQL with ETL version 6 for distributed & batch scalable ETL Pipeline Processing #5

Description

@ChiefsBestPal

Priority: Medium
Risk: Low (if start from fork/branch of existing ETL V6)
Work quantity: Medium-High

Description:

Leverage Apache Spark SQL and DataFrames to scale the BookScrapeDB ETL pipeline for processing millions of books/reviews efficiently with distributed computing.

Current State

  • Python-based ETL processing several different file formats at different step... from Goodreads, Google Books API, Open Library, ISBN APIs, and relevant externals crawled sites
  • MySQL warehouse loading with custom DML generators
  • Able to crawl and process concurrently on server cluster, but parallelization limited by Python GIL especially processing (~1-10M records)
  • GCP data lake loading, Batch load CSV or parquet or crawler intermediary files directly into Google Cloud Storage Engine to ran against BigQuery/CloudSQL Schemas...
  • Safest for cloud is to run ETL locally with latest DML generators and then load SQL in GCP partitions when DBMS works locally first

Proposed Enhancement

Integrate Spark SQL + DataFrames for:

  1. Distributed data ingestion from multiple sources
  2. Parallel transformation of book metadata, reviews, ratings
  3. Optimized loading into MySQL/Neo4j with partitioning
  4. Unified query interface across CSV, Parquet, JSON sources

Technical Approach

from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder \
    .appName("BookScrapeDB_ETL") \
    .config("spark.sql.warehouse.dir", "/warehouse") \
    .getOrCreate()

# Read multi-source data
books_df = spark.read \
    .option("multiline", "true") \
    .json("s3://bookscrape/goodreads/*.json")

google_df = spark.read.json("gs://bookscrape/google_books/*.json")
openlib_df = spark.read.parquet("s3://bookscrape/openlibrary/*.parquet")

# Register as temp tables
books_df.createOrReplaceTempView("goodreads_books")
google_df.createOrReplaceTempView("google_books")

# SQL-based transformation and deduplication
unified_books = spark.sql("""
    SELECT 
        COALESCE(g.book_gid, gb.volume_id) as book_id,
        COALESCE(g.title, gb.title) as title,
        g.averageRating,
        gb.description,
        g.book_gid,
        gb.volume_id,
        'merged' as source
    FROM goodreads_books g
    FULL OUTER JOIN google_books gb 
        ON g.isbn_13 = gb.isbn_13
""")

# Write to MySQL with partitioning
unified_books.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/bookscrapedb") \
    .option("dbtable", "book") \
    .option("user", "root") \
    .option("password", "password") \
    .mode("append") \
    .save()

Benefits

  • 10-100x faster processing for large datasets
  • Fault tolerance with automatic retry and checkpointing
  • Schema validation and type safety with DataFrames
  • Memory optimization with columnar storage (Parquet)
  • Unified interface - same code for local/cluster deployment

Implementation Tasks

  • Set up PySpark environment (local/cluster)
  • Convert existing CSV readers to Spark DataFrames
  • Implement Spark SQL transformations for book deduplication
  • Add JDBC connectors for MySQL batch loading
  • Configure Parquet caching for intermediate results
  • Add unit tests with small Spark datasets

Compatibility Notes

  • Python (PySpark): Full DataFrame API, no Dataset API (dynamic typing makes it unnecessary)
  • Java/Scala: Consider if type-safe Dataset API needed for complex transformations
  • GCP Integration: Works seamlessly with GCS, BigQuery connectors

Related

  • MySQL warehouse schema and server_cluster mysql migration to GCP CloudSQL/BQ
  • Multi-source data validation
  • Cloud deployment (GCP Dataproc)

References

https://spark.apache.org/sql/
https://spark.apache.org/docs/latest/sql-programming-guide.html

Metadata

Metadata

Assignees

No one assigned

    Labels

    Big Data & ScalingETLData processing / ETL pipelines, DML generators, processes between crawl, scrape, APIs, ingestEpic / Core StorySQLenhancementNew feature or requestspark

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions