Spark Data Platform
The result
Introduction
1. Design architecture

Design architecture is separated into 3 parts:
Data collection
Spark cluster
Serving part
In the data collection phase, we gather information by scraping data from IMDb.com and incorporating some sample data from Google Sheets, which is curated by Kaggle. Subsequently, we store this data in local storage (AWS S3 should be the preferred choice in a production environment). Pandas supports the manipulation of dataframes throughout this process.
To ensure a well-organized runtime environment, the entire procedure is containerized using Docker for better runtime environment and orchestrated by Dagster for effective management.
For the Spark cluster part, we set up and self-host a Spark Cluster with one master and three workers. These workers operate on personal computers contributed by our team members, creating a collaborative and distributed computing setup.

As shown in image, workers are: Chinh, Hao, Nhan, Ngoc.
Total RAM: 77.7 GB
Total cores: 40
In the serving phase, we establish a MySQL database using movie data. An API is then deployed to serve the retrieved movie information, complemented by a search engine backed by machine learning utilizing TF-IDF.

2. Scraping data from IMDB.com
To obtain the data, we observed that IMDB is no longer a static website since it has been acquired by Amazon Corp. While they offer an alternative through a GraphQL API, the associated costs surpass our budget.
In response to this challenge, we developed a web scraper to automatically gather movie data.
Data pipeline
Additionally, we constructed a data pipeline to seamlessly store the scraped data in object storage. This enhances the efficiency of the process, simplifying the transfer of data from the source (imdb.com) to our data platform.

Raw files

Raw assets include:
movies
: The movie metadata (movie_id, score, duration, director_name, actor_1_name, actor_2_name, actor_3_name, num_reviews, num_critics, num_votes, metascore, language, global_gross, year).thumbnails
: Thumbnail respective to a movie.reviews
: Reviews of a movie, includes both positive and negative reviews.pretrained_reviews
: Dataset taken from kaggle, combine withreviews
to give better training result.
Staging assets

cleaned_movies
: Process, clean, wrangle raw movies using PySpark dataframe to process large amount of data.
Modelling assets

tf_idf
: TF-IDF model, trained using overview and title column from movies data. Used to support the search engine for serving application.
3. Processing data
Here's the notebook version (as known as Exploratory Data Analysis) of movies.
The first dataset is dirty so we need to clean first.

Convert duration to true type.
""" Convert duration to minutes """
movies_df = movies_df.withColumn("duration", regexp_replace("duration", "h", ":")).withColumn("duration", regexp_replace("duration", "m", ""))
# Rows with NULL will turn to 0:00
movies_df = movies_df.withColumn("duration", when(col("duration") == "", "0:00").otherwise(col("duration")))
# Add 0: to the duration if it's just minutes
movies_df = movies_df.withColumn("duration", when(col("duration").contains(":"), col("duration")).otherwise("0:" + col("duration")))
# Remove any spaces in the duration
movies_df = movies_df.withColumn("duration", regexp_replace("duration", " ", ""))
Then use a UDF
function to convert numbers to their true types.
# Define the UDF function
def convert_to_number_udf(x):
key_decimals = {'K': 1000, 'M': 1000000, 'B': 1000000000}
if isinstance(x, str):
x = x.replace(',', '')
if x[-1] in key_decimals.keys():
change = float(x[:-1]) * key_decimals[x[-1]]
return int(change)
else:
return int(x)
else:
return x
# Register the UDF
convert_to_number_spark_udf = udf(convert_to_number_udf, IntegerType())
""" Convert number of reviewed users, number of reviewed critics, number of votes """
movies_df = movies_df.withColumn("num_reviews", convert_to_number_spark_udf(movies_df["num_reviews"]))
movies_df = movies_df.withColumn("num_critics", convert_to_number_spark_udf(movies_df["num_critics"]))
movies_df = movies_df.withColumn("num_votes", convert_to_number_spark_udf(movies_df["num_votes"]))
Other conversions.
# Check budget column
movies_df.describe("budget").show()
# Percentage of NULL values in budget column
percentage = movies_df.filter(col("budget").isNull()).count() / movies_df.count()
""" Convert global gross to integer """
movies_df = movies_df.withColumn("global_gross", regexp_replace("global_gross", "[^0-9]", ""))\
.withColumn("global_gross", col("global_gross").cast("integer"))
""" Convert year to integer """
movies_df = movies_df.withColumn("year", col("year").cast("integer"))
Now the schema looks better.

Check missing values in columns.

Then we could see some insight by generating some visualizations. Detailed view in the notebook.
4. Modelling the search engine
Here's the notebook version.
Initially, we loaded the dataframe df
from the Spark \tmp\folder
across all workers, ensuring uniformity of data among them. In the event that any worker did not possess the identical data, it would result in an error.

Then we only extract title
and overview
for our TF-IDF
model.
Before tokenization process, we eliminate entirely NaN data, representing values that are either empty or null, as obtained during the crawling process.
df = df.filter((col("overview").isNotNull()) & (col("overview") != ''))
TF-IDF Model
TF-IDF Formula

N is number of rows in data frame
total_docs = df.count()
: number of documents where the term t appears (i.e., t f ( t , d ) ≠ 0). If the term is not in the corpus, this will lead to a division-by-zero. It is therefore common to adjust the numerator 1 + N and denominator to 1 + | { d ∈ D : t ∈ d } | .
Tokenizer
def tokenize(text):
return re.findall('\\w+', text.lower())
This function seperates sentences in overview
to token
prepare for tf
dataframe
Apply Tokenizer to
df
dataframe
tokenize_udf = F.udf(tokenize, ArrayType(StringType()))
# tokenize all the text
data = df.select(['title', tokenize_udf('overview').alias('overview')])
# make 1 separate row for each token
data_tokens = data.withColumn("token", F.explode('overview'))
Calculate
TF
# calculate term frequency
tf = data_tokens.groupBy('title', 'token').agg(F.count('overview').alias('tf'))
Calculate DF
# calculate document frequency
df = data_tokens.groupBy('token').agg(F.countDistinct('title').alias('df'))
Calculate IDF - Inverse Doc Frequency
# utility method for calculating inverse document frequency
def inverse_doc_frequency(doc_frequency):
return math.log((total_docs)/ (doc_frequency))
# register inverse document frequency as a udf
inverse_doc_frequency_udf = F.udf(inverse_doc_frequency, FloatType())
# calculate the inverse document frequency
idf = df.withColumn('idf', inverse_doc_frequency_udf('df'))
Calculate TF-IDF by multiply TF with IDF
# calculate tfidf
tfidf = tf.join(idf, 'token').withColumn('tfidf', F.col('tf') * F.col('idf'))
Making Search
def search(query, N):
# tokenize query into terms
terms = tokenize(query)
# create a dataframe with each term as a separate row
query_tokens = spark.createDataFrame(
terms, StringType()).withColumnRenamed('value', 'token')
# get aggregated score and count for each document for all the matched tokens
result = query_tokens.join(tfidf, 'token').groupBy('title').agg(
F.sum('tfidf').alias('score_sum'), F.count('tfidf').alias('matched_terms'))
# calculate document score
result = result.withColumn('score', F.col(
'score_sum') * F.col('matched_terms') / len(terms))
# show top N documents
result.select('title', 'score').sort(F.col('score').desc()).show(N, False)
Get the results with highest top_N score
from tf-idf
frame score for most similarities

Serving
To maximize the benefits of our search engine model, we've created a user-friendly REST API endpoint. This endpoint takes a string input and, in return, offers a list of movies derived from the model's outcomes.
For a straightforward yet efficient solution, we've chosen FastAPI.
Database
To serve the backend API written in FastAPI, we created a MySQL database to support this.

The image above is docker compose script to create a MySQL container that saves necessary information about movies.
Building API
In this implementation, we've developed a simple REST API using FastAPI, with uvicorn serving as the application runtime.
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)
Before launching the backend, it's essential to initialize a Spark application and load the tfidf
model, which is crucial for the functionality of the `/search endpoint.
# create spark application
spark = (
SparkSession.builder.appName("Movies")
.master("spark://192.168.194.64:7077")
.config("spark.executor.memory", "8g")
.getOrCreate()
)
sc = spark.sparkContext
# read tfidf
tfidf_path = "../pipeline_data_platform/data/model/tfidf.parquet"
tfidf = spark.read.parquet(tfidf_path)
However, the current implementation yields a list of movie titles as the output of the search engine model. For a more practical real-world application, the API's response should encompass detailed information about the movies, rather than merely their titles.
To achieve this, the procedure involves retrieving results from the search engine model, querying the database to obtain corresponding movie details, and subsequently returning the acquired movie information.
To facilitate database sessions for movie querying within the endpoint, sqlalchemy is employed:
engine = create_engine(
SQLALCHEMY_DATABASE_URL
)
Base = automap_base()
Base.prepare(engine, reflect=True)
def get_db():
try:
db = Session(engine)
yield db
finally:
db.close()
The get_db
function is used in obtaining a database session. Finally, the API endpoint is created as follows:
@app.get("/search")
def search_movie(q: str = Query(), db: Session = Depends(get_db)):
movie_titles = search(spark, tfidf, q, 10)
movies = []
for title in movie_titles:
movie = db.query(ModelMovie).filter(ModelMovie.title == title).first()
movies.append(movie)
return movies
Depending on the computational power of the machine, the endpoint is expected to return a list of movie details within a duration ranging from 5 to 30 seconds.
Application
We've crafted a movie platform reminiscent of Netflix using NextJS. This platform showcases movies obtained through our initial IMDB scraping efforts.
Additionally, we've incorporated a user-friendly search bar. When users input their query and submit it, the platform sends a request to the previously established search engine API. Once the response is received, the platform dynamically renders the movies based on the obtained information.
Getting started
First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in "editable mode" so that as you develop, local code changes will automatically apply.
pip install -e ".[dev]"
Duplicate the .env.example
file and rename to .env
. Then, fill in the values of environment variables in that file.
Start Dagster UI web server:
python -m dagster dev -h 0.0.0.0
Open http://localhost:3000 using your browser to see the project.
Development
Adding new Python dependencies
You can specify new Python dependencies in setup.py
Unit testing
Unit tests are available in data_platform_tests
directory and you can run tests using pytest:
python -m pytest data_platform_tests
Develop with Spark Cluster
Build docker images
You need to build 2 images. One for dagster-webserver
and dagster-daemon
(both use the same image). And one for pipeline.
docker build -t dagster .
docker build -t pipeline pipeline_data_platform
Last updated