Sentiment analysis on streaming Twitter data using Kafka, Spark Structured Streaming & Python (Part 2)

A friend told me one day that, a problem we face at x time is an assembly of small problems not resolved previously and that we aren't aware of.

I decided weeks ago to learn how to do sentiment analysis on streaming Twitter Data, but to do the entire project it was necessary to understand the architectural concept and walk step by step. I divided the process into multiple parts. I hope you will understand the overview throw every component, just view the component as a lego piece.

Part 1:

Sentiment analysis: This is a kind of Hello word for big data analysis. I wrote an article about the methodology. My goal was to connect to Twitter using his API and get several tweets about a particular topic. The topic I choose was Bitcoin. The project is also available on my Github.

Part 2:

After working on a specific quantity of Data as shown in the first part, I had to start building an architecture for real-time analysis. So, Understanding the key concept about Kafka,Apache Structured Streaming was important as the language to choose.

Goal

The goal is to do real-time sentiment analysis and store the result in MongoDB. The first part is available here. Now let's dive into the process.

Python packages:

TextBlob to do simple sentiment analysis on tweets (demo purpose only).

Data Processing Engine:

Spark: we will spark to process the Streaming

Storage

MongoDB: Here will we use MongoDB as the output sink.

Now, let's jump into it.

1. Data Ingestion

  • Create a Kafka producer. The goal is to connect to Twitter API and get tweets about a particular topic.
import tweepy
from kafka import KafkaProducer
import logging

"""API ACCESS KEYS"""

consumerKey = "XXXX"
consumerSecret = "XXXX"
accessToken = "XXXX"
accessTokenSecret = "XXXX"

producer = KafkaProducer(bootstrap_servers='localhost:9092')
search_term = 'Bitcoin'
topic_name = 'twitter'


def twitterAuth():
    # create the authentication object
    authenticate = tweepy.OAuthHandler(consumerKey, consumerSecret)
    # set the access token and the access token secret
    authenticate.set_access_token(accessToken, accessTokenSecret)
    # create the API object
    api = tweepy.API(authenticate, wait_on_rate_limit=True)
    return api


class TweetListener(tweepy.Stream):

    def on_data(self, raw_data):
        logging.info(raw_data)
        producer.send(topic_name, value=raw_data)
        return True

    def on_error(self, status_code):
        if status_code == 420:
            # returning False in on_data disconnects the stream
            return False

    def start_streaming_tweets(self, search_term):
        self.filter(track=search_term, stall_warnings=True, languages=["en"])


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    twitter_stream = TweetListener(consumerKey, consumerSecret, accessToken, accessTokenSecret)
    twitter_stream.start_streaming_tweets(search_term)
  • Create a Kafka consumer: send the information collected to the Kafka consumer.
from kafka import KafkaConsumer
import json

topic_name = 'twitter'

consumer = KafkaConsumer(
    topic_name,
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='latest',
     enable_auto_commit=True,
     auto_commit_interval_ms=5000,
     fetch_max_bytes=128,
     max_poll_records=100,
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    tweets = json.loads(json.dumps(message.value))
    print(tweets)

2. The Streaming Pipeline

This will be done with Spark.

  • First import the libraries:
from pyspark.sql import functions as F
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import StringType, StructType, StructField, FloatType
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.ml.feature import RegexTokenizer
import re
from textblob import TextBlob
  • Create Spark session and start reading stream which came from Kafka. This implies that the connection with Kafka has to be done.
spark = SparkSession \
        .builder \
        .appName("TwitterSentimentAnalysis") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
        .getOrCreate()

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "twitter") \
        .load()

Now do some transformation.

Here we just get the "text". As the data is coming from Kafka, this data is in a JSON form with multiple information such as the created date of the tweet, the username… And, "text" is the one that has the tweet text.

mySchema = StructType([StructField("text", StringType(), True)])
    # Get only the "text" from the information we receive from Kafka. The text is the tweet produce by a user
    values = df.select(from_json(df.value.cast("string"), mySchema).alias("tweet"))

Clean Data

The cleaning process is to remove links, users, punctuation, numbers and hashtags.

def cleanTweet(tweet: str) -> str:
    tweet = re.sub(r'http\S+', '', str(tweet))
    tweet = re.sub(r'bit.ly/\S+', '', str(tweet))
    tweet = tweet.strip('[link]')

    # remove users
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    # remove puntuation
    my_punctuation = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~•@â'
    tweet = re.sub('[' + my_punctuation + ']+', ' ', str(tweet))

    # remove number
    tweet = re.sub('([0-9]+)', '', str(tweet))

    # remove hashtag
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    return tweet

Here, we apply the clean function on UDF (User Defined Function) and then create a new column with the clean data.

df1 = values.select("tweet.*")
clean_tweets = F.udf(cleanTweet, StringType())
raw_tweets = df1.withColumn('processed_text', clean_tweets(col("text")))

3. Sentiment analysis

First get the subjectivity, polarity and sentiment :

# Create a function to get the subjectifvity
def getSubjectivity(tweet: str) -> float:
    return TextBlob(tweet).sentiment.subjectivity


# Create a function to get the polarity
def getPolarity(tweet: str) -> float:
    return TextBlob(tweet).sentiment.polarity


def getSentiment(polarityValue: int) -> str:
    if polarityValue < 0:
        return 'Negative'
    elif polarityValue == 0:
        return 'Neutral'
    else:
        return 'Positive'

Apply it to tweet:

subjectivity = F.udf(getSubjectivity, FloatType())
polarity = F.udf(getPolarity, FloatType())
sentiment = F.udf(getSentiment, StringType())

subjectivity_tweets = raw_tweets.withColumn('subjectivity', subjectivity(col("processed_text")))
polarity_tweets = subjectivity_tweets.withColumn("polarity", polarity(col("processed_text")))
sentiment_tweets = polarity_tweets.withColumn("sentiment", sentiment(col("polarity")))

When applying it into the console, we see something like that:

Capture d’écran 2022-03-11 à 23.47.49.png

4. Store data frame into MongoDB Atlas

Here, I have to reveal that it wasn't the easier part as I am not familiar with MongoDB Atlas and didn't know how to connect spark to this output sink. I just had to store my output in MongoDB Atlas which is the best way to deploy, run and scale MongoDB in the cloud. The process is as follow:

  • Go to the website and create an account
  • Create a Cluster
  • Give access to your IP address to have permission to connect to your cluster as shown in the image above

Capture d’écran 2022-03-13 à 23.06.01.png

  • Create a database and a collection

Congratulations you did the first part of the work. Now, let's connect our spark to MongoDB. To do this:

  • Go to the overview page

Capture d’écran 2022-03-13 à 23.11.38.png

  • Then "connect"

Capture d’écran 2022-03-13 à 23.16.36.png

  • Choose your Driver, the version and copy the output. Make sure to modify the password, collection and database name. You will get something like this:
mongodb+srv://Lorena:<password>@cluster0.9psdq.mongodb.net/myFirstDatabaseretryWrites=true&w=majority

After this, add this to the spark session and update the spark session configuration:

spark = SparkSession \
        .builder \
        .appName("TwitterSentimentAnalysis") \
        .config("spark.mongodb.input.uri",
                "mongodb+srv://<Username>:<Password>@cluster0.9psdq.mongodb.net/<database_name>.<database_collection>"
                "?retryWrites=true&w=majority") \
        .config("spark.mongodb.output.uri",
                "mongodb+srv://<Username>:<Password>@cluster0.9psdq.mongodb.net/<database_name>.<database_collection>"
                "?retryWrites=true&w=majority") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
        .getOrCreate()
  • Create a function to add a row in mongo and save:
def write_row_in_mongo(df, epoch_id):
    mongoURL = "mongodb+srv://Lorena:<Password>@cluster0.9psdq.mongodb.net/<database>.<collection>" \
               "?retryWrites=true&w=majority "
    df.write.format("mongo").mode("append").option("uri", mongoURL).save()
    pass
  • And, start writing stream in MongoDB
query = sentiment_tweets.writeStream.queryName("test_tweets") \
        .foreachBatch(write_row_in_mongo).start()
query.awaitTermination()

Well done, now we can see that the data is stored in the database.

Capture d’écran 2022-03-13 à 22.29.42.png

Conclusion

To conclude, the goal of the second part was to collect the cleaned tweets and send them into an output sink. The one I choose here is MongoDB. The objective has been reached and I will improve it and I will continue with the other parts during the next weeks. The project is available on my GitHub.


I hope this information was helpful and interesting, if you have any questions, or you just want to say hi, I'm happy to connect and respond to any questions you may have about my blogs! Feel free to visit my website for more!