Spark Data Platform
Last updated
Last updated
Design architecture is separated into 3 parts:
Data collection
Spark cluster
Serving part
In the data collection phase, we gather information by scraping data from IMDb.com and incorporating some sample data from Google Sheets, which is curated by Kaggle. Subsequently, we store this data in local storage (AWS S3 should be the preferred choice in a production environment). Pandas supports the manipulation of dataframes throughout this process.
To ensure a well-organized runtime environment, the entire procedure is containerized using Docker for better runtime environment and orchestrated by Dagster for effective management.
For the Spark cluster part, we set up and self-host a Spark Cluster with one master and three workers. These workers operate on personal computers contributed by our team members, creating a collaborative and distributed computing setup.
As shown in image, workers are: Chinh, Hao, Nhan, Ngoc.
Total RAM: 77.7 GB
Total cores: 40
In the serving phase, we establish a MySQL database using movie data. An API is then deployed to serve the retrieved movie information, complemented by a search engine backed by machine learning utilizing TF-IDF.
To obtain the data, we observed that IMDB is no longer a static website since it has been acquired by Amazon Corp. While they offer an alternative through a GraphQL API, the associated costs surpass our budget.
In response to this challenge, we developed a web scraper to automatically gather movie data.
Additionally, we constructed a data pipeline to seamlessly store the scraped data in object storage. This enhances the efficiency of the process, simplifying the transfer of data from the source (imdb.com) to our data platform.
Raw assets include:
movies
: The movie metadata (movie_id, score, duration, director_name, actor_1_name, actor_2_name, actor_3_name, num_reviews, num_critics, num_votes, metascore, language, global_gross, year).
thumbnails
: Thumbnail respective to a movie.
reviews
: Reviews of a movie, includes both positive and negative reviews.
pretrained_reviews
: Dataset taken from kaggle, combine with reviews
to give better training result.
cleaned_movies
: Process, clean, wrangle raw movies using PySpark dataframe to process large amount of data.
tf_idf
: TF-IDF model, trained using overview and title column from movies data. Used to support the search engine for serving application.
Here's the notebook version (as known as Exploratory Data Analysis) of movies.
The first dataset is dirty so we need to clean first.
Convert duration to true type.
Then use a UDF
function to convert numbers to their true types.
Other conversions.
Now the schema looks better.
Check missing values in columns.
Then we could see some insight by generating some visualizations. Detailed view in the notebook.
Here's the notebook version.
Initially, we loaded the dataframe df
from the Spark \tmp\folder
across all workers, ensuring uniformity of data among them. In the event that any worker did not possess the identical data, it would result in an error.
Then we only extract title
and overview
for our TF-IDF
model.
Before tokenization process, we eliminate entirely NaN data, representing values that are either empty or null, as obtained during the crawling process.
TF-IDF Model
TF-IDF Formula
N is number of rows in data frame
Tokenizer
This function seperates sentences in overview
to token
prepare for tf
dataframe
Apply Tokenizer to df
dataframe
Calculate TF
Calculate DF
Calculate IDF - Inverse Doc Frequency
Calculate TF-IDF by multiply TF with IDF
Making Search
Get the results with highest top_N score
from tf-idf
frame score for most similarities
To maximize the benefits of our search engine model, we've created a user-friendly REST API endpoint. This endpoint takes a string input and, in return, offers a list of movies derived from the model's outcomes.
For a straightforward yet efficient solution, we've chosen FastAPI.
To serve the backend API written in FastAPI, we created a MySQL database to support this.
The image above is docker compose script to create a MySQL container that saves necessary information about movies.
In this implementation, we've developed a simple REST API using FastAPI, with uvicorn serving as the application runtime.
Before launching the backend, it's essential to initialize a Spark application and load the tfidf
model, which is crucial for the functionality of the `/search endpoint.
However, the current implementation yields a list of movie titles as the output of the search engine model. For a more practical real-world application, the API's response should encompass detailed information about the movies, rather than merely their titles.
To achieve this, the procedure involves retrieving results from the search engine model, querying the database to obtain corresponding movie details, and subsequently returning the acquired movie information.
To facilitate database sessions for movie querying within the endpoint, sqlalchemy is employed:
The get_db
function is used in obtaining a database session. Finally, the API endpoint is created as follows:
Depending on the computational power of the machine, the endpoint is expected to return a list of movie details within a duration ranging from 5 to 30 seconds.
We've crafted a movie platform reminiscent of Netflix using NextJS. This platform showcases movies obtained through our initial IMDB scraping efforts.
Additionally, we've incorporated a user-friendly search bar. When users input their query and submit it, the platform sends a request to the previously established search engine API. Once the response is received, the platform dynamically renders the movies based on the obtained information.
First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in "editable mode" so that as you develop, local code changes will automatically apply.
Duplicate the .env.example
file and rename to .env
. Then, fill in the values of environment variables in that file.
Start Dagster UI web server:
Open http://localhost:3000 using your browser to see the project.
You can specify new Python dependencies in setup.py
Unit tests are available in data_platform_tests
directory and you can run tests using pytest:
You need to build 2 images. One for dagster-webserver
and dagster-daemon
(both use the same image). And one for pipeline.
: number of documents where the term t appears (i.e., t f ( t , d ) ≠ 0). If the term is not in the corpus, this will lead to a division-by-zero. It is therefore common to adjust the numerator 1 + N and denominator to 1 + | { d ∈ D : t ∈ d } | .