A Serverless Data Pipeline
Showcases an end-to-end serverless data pipeline that collects posts from Reddit, analyzes their sentiment, and visualizes the results. Leveraging AWS services, the system is highly scalable, cost-effective, and requires minimal maintenance.
The sentiment analysis provides insights into how Reddit users perceive various topics, enabling trend analysis and identification of content patterns.
Successfully analyzed over 1,000 Reddit posts, revealing that 36% express positive sentiment, 47% remain neutral, and 17% show negative sentiment. This balanced distribution provides valuable insights into the content on Reddit.
End-to-end serverless data pipeline architecture featuring AWS services
# Core sentiment analysis Lambda function
import json
import boto3
import requests
from datetime import datetime
from typing import Dict, List, Tuple
# AWS service clients
s3_client = boto3.client('s3')
S3_BUCKET = 'reddit-sentiment-quicksight'
class RedditSentimentAnalyzer:
"""Analyzes sentiment of Reddit posts using NLP techniques."""
def __init__(self):
self.positive_lexicon = self._load_lexicon('positive')
self.negative_lexicon = self._load_lexicon('negative')
def analyze_sentiment(self, text: str) -> float:
"""
Performs sentiment analysis on text using lexicon-based approach.
Returns score between -1 (negative) and 1 (positive).
"""
text_lower = text.lower()
words = text_lower.split()
positive_count = sum(1 for word in words if word in self.positive_lexicon)
negative_count = sum(1 for word in words if word in self.negative_lexicon)
total_words = len(words)
if total_words == 0:
return 0.0
# Calculate normalized sentiment score
sentiment_score = (positive_count - negative_count) / total_words
return max(-1.0, min(1.0, sentiment_score))
def lambda_handler(event, context) -> Dict:
"""Main Lambda handler function."""
try:
analyzer = RedditSentimentAnalyzer()
# Fetch Reddit posts
posts = fetch_reddit_posts()
# Analyze sentiment for each post
analyzed_data = []
for post in posts:
sentiment_score = analyzer.analyze_sentiment(post['title'])
analyzed_data.append({
'id': post['id'],
'title': post['title'],
'sentiment_score': sentiment_score,
'timestamp': datetime.now().isoformat()
})
# Generate CSV and upload to S3
csv_content = generate_csv(analyzed_data)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"reddit_data_{timestamp}.csv"
s3_client.put_object(
Bucket=S3_BUCKET,
Key=s3_key,
Body=csv_content,
ContentType='text/csv'
)
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Successfully processed {len(posts)} posts',
's3_key': s3_key
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
-- Create external table for Reddit sentiment data
CREATE EXTERNAL TABLE reddit_sentiment_data (
id STRING,
post_title STRING,
sentiment_score DOUBLE,
post_date TIMESTAMP
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '"',
'escapeChar' = '\\'
)
STORED AS TEXTFILE
LOCATION 's3://reddit-sentiment-quicksight/'
TBLPROPERTIES (
'skip.header.line.count'='1',
'classification'='csv'
);
-- Daily sentiment trends
SELECT
DATE(post_date) AS date,
COUNT(*) AS total_posts,
AVG(sentiment_score) AS avg_sentiment,
STDDEV(sentiment_score) AS sentiment_stddev,
PERCENTILE_APPROX(sentiment_score, 0.5) AS median_sentiment
FROM
reddit_sentiment_data
GROUP BY
DATE(post_date)
ORDER BY
date DESC;
-- Sentiment distribution analysis
SELECT
CASE
WHEN sentiment_score > 0.2 THEN 'Positive'
WHEN sentiment_score < -0.2 THEN 'Negative'
ELSE 'Neutral'
END AS sentiment_category,
COUNT(*) AS post_count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS percentage
FROM
reddit_sentiment_data
GROUP BY
CASE
WHEN sentiment_score > 0.2 THEN 'Positive'
WHEN sentiment_score < -0.2 THEN 'Negative'
ELSE 'Neutral'
END;
-- Top trending positive posts
SELECT
id,
post_title,
sentiment_score,
post_date
FROM
reddit_sentiment_data
WHERE
sentiment_score > 0
ORDER BY
sentiment_score DESC,
post_date DESC
LIMIT 10;