# Spark Data Platform

## **The result**

{% embed url="<https://drive.google.com/file/d/1u9gTG51ibDK6jSB0Vrdr5vi0dpQhNPNd/view?usp=drive_link>" %}
Final result
{% endembed %}

## Introduction

### 1. Design architecture

<figure><img src="/files/WJixdUbkmWjABiJvIuXf" alt=""><figcaption><p>Architecture</p></figcaption></figure>

Design architecture is separated into 3 parts:

* Data collection
* Spark cluster
* Serving part

In the data collection phase, we gather information by scraping data from IMDb.com and incorporating some sample data from Google Sheets, which is curated by Kaggle. Subsequently, we store this data in local storage (AWS S3 should be the preferred choice in a production environment). Pandas supports the manipulation of dataframes throughout this process.

To ensure a well-organized runtime environment, the entire procedure is containerized using [Docker](https://www.docker.com/) for better runtime environment and orchestrated by [Dagster](https://dagster.io/) for effective management.

For the Spark cluster part, we set up and self-host a Spark Cluster with one master and three workers. These workers operate on personal computers contributed by our team members, creating a collaborative and distributed computing setup.

<figure><img src="/files/1SsBu0hhUsnLJ6yW1ImP" alt=""><figcaption><p>Spark workers</p></figcaption></figure>

As shown in image, workers are: Chinh, Hao, Nhan, Ngoc.

* Total RAM: 77.7 GB
* Total cores: 40

In the serving phase, we establish a MySQL database using movie data. An API is then deployed to serve the retrieved movie information, complemented by a search engine backed by machine learning utilizing TF-IDF.

<figure><img src="/files/P5N7DHCuscwgnsMWEAWA" alt=""><figcaption><p>Serving application</p></figcaption></figure>

### 2. Scraping data from IMDB.com

To obtain the data, we observed that IMDB is no longer a static website since it has been acquired by Amazon Corp. While they offer an alternative through a GraphQL API, the associated costs surpass our budget.

In response to this challenge, we developed a web scraper to automatically gather movie data.

{% embed url="<https://drive.google.com/file/d/1cnogQUY5z9hSucI21F4rut1Qc8t6XrFo/view?usp=drive_link>" %}

#### Data pipeline

Additionally, we constructed a data pipeline to seamlessly store the scraped data in object storage. This enhances the efficiency of the process, simplifying the transfer of data from the source (imdb.com) to our data platform.

<figure><img src="/files/xQsa0glaKpM4tOjifp8K" alt=""><figcaption><p>Data pipeline</p></figcaption></figure>

#### Raw files

<figure><img src="/files/RS9lpbG0y86YBiDaZrJq" alt=""><figcaption><p>Raw assets</p></figcaption></figure>

Raw assets include:

* `movies`: The movie metadata (movie\_id, score, duration, director\_name, actor\_1\_name, actor\_2\_name, actor\_3\_name, num\_reviews, num\_critics, num\_votes, metascore, language, global\_gross, year).
* `thumbnails`: Thumbnail respective to a movie.
* `reviews`: Reviews of a movie, includes both positive and negative reviews.
* `pretrained_reviews`: Dataset taken from kaggle, combine with `reviews` to give better training result.

#### Staging assets

<figure><img src="/files/3zKtFDs2beb4MBRQhvSi" alt=""><figcaption><p>Staging assets</p></figcaption></figure>

* `cleaned_movies`: Process, clean, wrangle raw movies using PySpark dataframe to process large amount of data.

#### Modelling assets

<figure><img src="/files/EsaEZ9D9kcJNEGi9d2Zl" alt=""><figcaption></figcaption></figure>

* `tf_idf`: TF-IDF model, trained using *overview* and title column from movies data. Used to support the search engine for serving application.

### 3. Processing data

Here's the notebook version (as known as Exploratory Data Analysis) of movies.

{% embed url="<https://drive.google.com/file/d/1OXD7DnESJji0axrKTAHGUsW4HznsOtVm/view?usp=sharing>" %}

The first dataset is dirty so we need to clean first.

<figure><img src="/files/noM3jvOdkMH8V5tdwFZh" alt=""><figcaption></figcaption></figure>

Convert duration to true type.

```python
""" Convert duration to minutes """

movies_df = movies_df.withColumn("duration", regexp_replace("duration", "h", ":")).withColumn("duration", regexp_replace("duration", "m", ""))

# Rows with NULL will turn to 0:00
movies_df = movies_df.withColumn("duration", when(col("duration") == "", "0:00").otherwise(col("duration")))

# Add 0: to the duration if it's just minutes
movies_df = movies_df.withColumn("duration", when(col("duration").contains(":"), col("duration")).otherwise("0:" + col("duration")))

# Remove any spaces in the duration
movies_df = movies_df.withColumn("duration", regexp_replace("duration", " ", ""))
```

Then use a `UDF` function to convert numbers to their true types.

```python
# Define the UDF function
def convert_to_number_udf(x):
    key_decimals = {'K': 1000, 'M': 1000000, 'B': 1000000000}
    if isinstance(x, str):
        x = x.replace(',', '')
        if x[-1] in key_decimals.keys():
            change = float(x[:-1]) * key_decimals[x[-1]]
            return int(change)
        else:
            return int(x)
    else:
        return x

# Register the UDF
convert_to_number_spark_udf = udf(convert_to_number_udf, IntegerType())

""" Convert number of reviewed users, number of reviewed critics, number of votes """
movies_df = movies_df.withColumn("num_reviews", convert_to_number_spark_udf(movies_df["num_reviews"]))
movies_df = movies_df.withColumn("num_critics", convert_to_number_spark_udf(movies_df["num_critics"]))
movies_df = movies_df.withColumn("num_votes", convert_to_number_spark_udf(movies_df["num_votes"]))
```

Other conversions.

```python
# Check budget column
movies_df.describe("budget").show()

# Percentage of NULL values in budget column
percentage = movies_df.filter(col("budget").isNull()).count() / movies_df.count()

""" Convert global gross to integer """
movies_df = movies_df.withColumn("global_gross", regexp_replace("global_gross", "[^0-9]", ""))\
    .withColumn("global_gross", col("global_gross").cast("integer"))
    
""" Convert year to integer """
movies_df = movies_df.withColumn("year", col("year").cast("integer"))
```

Now the schema looks better.

<figure><img src="/files/Tue8yDDqIge4680ye2ZH" alt=""><figcaption><p>Better schema</p></figcaption></figure>

Check missing values in columns.

<figure><img src="/files/5zfN6DfIUFMF9d4EEA9P" alt=""><figcaption></figcaption></figure>

Then we could see some insight by generating some visualizations. Detailed view in the notebook.

### 4. Modelling the search engine

Here's the notebook version.

{% embed url="<https://drive.google.com/file/d/1myTL_hrnvktjmkIBkUvzpajjNNmtuqxn/view?usp=sharing>" %}

Initially, we loaded the dataframe `df` from the Spark `\tmp\folder` across all workers, ensuring uniformity of data among them. In the event that any worker did not possess the identical data, it would result in an error.

<figure><img src="/files/hQoK9nmRPXnCMAYuE2wo" alt=""><figcaption></figcaption></figure>

Then we only extract `title` and `overview` for our `TF-IDF` model.

Before tokenization process, we eliminate entirely NaN data, representing values that are either empty or null, as obtained during the crawling process.

```python
df = df.filter((col("overview").isNotNull()) & (col("overview") != ''))
```

**TF-IDF Model**

1. TF-IDF Formula&#x20;

<figure><img src="/files/zEBKC9B9mQ6jgQh2J4kM" alt=""><figcaption></figcaption></figure>

N is number of rows in data frame

```python
total_docs = df.count()
```

![|\\{d \in D: t \in d\\}|](https://wikimedia.org/api/rest_v1/media/math/render/svg/3ddbc54aba98eaf15ef0f1a0677d3de5c6b4df07) : number of documents where the term t  appears (i.e., t f ( t , d ) ≠ 0). If the term is not in the corpus, this will lead to a division-by-zero. It is therefore common to adjust the numerator 1 + N and denominator to 1 + | { d ∈ D : t ∈ d } | .

2. Tokenizer&#x20;

```python
def tokenize(text):
    return re.findall('\\w+', text.lower())
```

This function seperates sentences in `overview` to `token` prepare for `tf` dataframe

3. Apply Tokenizer to `df` dataframe

```python
tokenize_udf = F.udf(tokenize, ArrayType(StringType()))
# tokenize all the text
data = df.select(['title', tokenize_udf('overview').alias('overview')])
# make 1 separate row for each token
data_tokens = data.withColumn("token", F.explode('overview'))
```

4. Calculate `TF`

```python
# calculate term frequency
tf = data_tokens.groupBy('title', 'token').agg(F.count('overview').alias('tf'))
```

5. Calculate DF

```python
# calculate document frequency
df = data_tokens.groupBy('token').agg(F.countDistinct('title').alias('df'))
```

6. Calculate IDF - Inverse Doc Frequency

```python
# utility method for calculating inverse document frequency
def inverse_doc_frequency(doc_frequency):
    return math.log((total_docs)/ (doc_frequency))

# register inverse document frequency as a udf
inverse_doc_frequency_udf = F.udf(inverse_doc_frequency, FloatType())
# calculate the inverse document frequency
idf = df.withColumn('idf', inverse_doc_frequency_udf('df'))
```

7. Calculate TF-IDF by multiply TF with IDF

```python
# calculate tfidf
tfidf = tf.join(idf, 'token').withColumn('tfidf', F.col('tf') * F.col('idf'))
```

**Making Search**&#x20;

```python
def search(query, N):
    # tokenize query into terms
    terms = tokenize(query)
    # create a dataframe with each term as a separate row
    query_tokens = spark.createDataFrame(
        terms, StringType()).withColumnRenamed('value', 'token')
    # get aggregated score and count for each document for all the matched tokens
    result = query_tokens.join(tfidf, 'token').groupBy('title').agg(
        F.sum('tfidf').alias('score_sum'), F.count('tfidf').alias('matched_terms'))
    # calculate document score
    result = result.withColumn('score', F.col(
        'score_sum') * F.col('matched_terms') / len(terms))
    # show top N documents
    result.select('title', 'score').sort(F.col('score').desc()).show(N, False)
```

Get the results with highest top\_N `score` from `tf-idf` frame score for most similarities<br>

<figure><img src="/files/afaigCfrjFVvMiMl0y9M" alt=""><figcaption></figcaption></figure>

### Serving

To maximize the benefits of our search engine model, we've created a user-friendly REST API endpoint. This endpoint takes a string input and, in return, offers a list of movies derived from the model's outcomes.

For a straightforward yet efficient solution, we've chosen FastAPI.

#### Database

To serve the backend API written in FastAPI, we created a MySQL database to support this.

<figure><img src="/files/zpd4b0gOhL5YHsdo4rLj" alt=""><figcaption><p>Docker compose script</p></figcaption></figure>

The image above is docker compose script to create a MySQL container that saves necessary information about movies.

#### Building API

In this implementation, we've developed a simple REST API using FastAPI, with uvicorn serving as the application runtime.

```python
if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)
```

Before launching the backend, it's essential to initialize a Spark application and load the `tfidf` model, which is crucial for the functionality of the \`/search endpoint.

```python
# create spark application
spark = (
    SparkSession.builder.appName("Movies")
    .master("spark://192.168.194.64:7077")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)
sc = spark.sparkContext

# read tfidf
tfidf_path = "../pipeline_data_platform/data/model/tfidf.parquet"
tfidf = spark.read.parquet(tfidf_path)
```

However, the current implementation yields a list of movie titles as the output of the search engine model. For a more practical real-world application, the API's response should encompass detailed information about the movies, rather than merely their titles.&#x20;

To achieve this, the procedure involves retrieving results from the search engine model, querying the database to obtain corresponding movie details, and subsequently returning the acquired movie information.

To facilitate database sessions for movie querying within the endpoint, sqlalchemy is employed:

```python
engine = create_engine(
    SQLALCHEMY_DATABASE_URL
)

Base = automap_base()
Base.prepare(engine, reflect=True)

def get_db():
    try:
        db = Session(engine)
        yield db
    finally:
        db.close()
```

The `get_db` function is used in obtaining a database session. Finally, the API endpoint is created as follows:

```python
@app.get("/search")
def search_movie(q: str = Query(), db: Session = Depends(get_db)):
    movie_titles = search(spark, tfidf, q, 10)

    movies = []

    for title in movie_titles:
        movie = db.query(ModelMovie).filter(ModelMovie.title == title).first()
        movies.append(movie)

    return movies
```

Depending on the computational power of the machine, the endpoint is expected to return a list of movie details within a duration ranging from 5 to 30 seconds.

#### Application

We've crafted a movie platform reminiscent of Netflix using NextJS. This platform showcases movies obtained through our initial IMDB scraping efforts.&#x20;

Additionally, we've incorporated a user-friendly search bar. When users input their query and submit it, the platform sends a request to the previously established search engine API. Once the response is received, the platform dynamically renders the movies based on the obtained information.

{% embed url="<https://drive.google.com/file/d/1EGNiw6XN5BLP6GMZjiK6B3qXhXoU9E8W/view?usp=drive_link>" %}

## Getting started

First, install your Dagster code location as a Python package. By using the --editable flag, pip will install your Python package in "editable mode" so that as you develop, local code changes will automatically apply.

```bash
pip install -e ".[dev]"
```

Duplicate the `.env.example` file and rename to `.env`. Then, fill in the values of environment variables in that file.

Start Dagster UI web server:

```bash
python -m dagster dev -h 0.0.0.0
```

Open <http://localhost:3000> using your browser to see the project.

## Development

### Adding new Python dependencies

You can specify new Python dependencies in `setup.py`

### Unit testing

Unit tests are available in `data_platform_tests` directory and you can run tests using pytest:

```bash
python -m pytest data_platform_tests
```

### Develop with Spark Cluster

#### Build docker images

You need to build 2 images. One for `dagster-webserver` and `dagster-daemon` (both use the same image). And one for pipeline.

```bash
docker build -t dagster .
docker build -t pipeline pipeline_data_platform
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://lelouvincxs-organization.gitbook.io/spark-data-platform/spark-data-platform.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
