A Serverless Data Pipeline
Showcases an end-to-end serverless data pipeline that collects posts from Reddit, analyzes their sentiment, and visualizes the results. Leveraging AWS services, the system is highly scalable, cost-effective, and requires minimal maintenance.
The sentiment analysis provides insights into how Reddit users perceive various topics, enabling trend analysis and identification of content patterns.
Successfully analyzed over 1,000 Reddit posts, revealing that 36% express positive sentiment, 47% remain neutral, and 17% show negative sentiment. This balanced distribution provides valuable insights into the content on Reddit.
End-to-end serverless data pipeline architecture featuring AWS services
# Core sentiment analysis Lambda function import json import boto3 import requests from datetime import datetime from typing import Dict, List, Tuple # AWS service clients s3_client = boto3.client('s3') S3_BUCKET = 'reddit-sentiment-quicksight' class RedditSentimentAnalyzer: """Analyzes sentiment of Reddit posts using NLP techniques.""" def __init__(self): self.positive_lexicon = self._load_lexicon('positive') self.negative_lexicon = self._load_lexicon('negative') def analyze_sentiment(self, text: str) -> float: """ Performs sentiment analysis on text using lexicon-based approach. Returns score between -1 (negative) and 1 (positive). """ text_lower = text.lower() words = text_lower.split() positive_count = sum(1 for word in words if word in self.positive_lexicon) negative_count = sum(1 for word in words if word in self.negative_lexicon) total_words = len(words) if total_words == 0: return 0.0 # Calculate normalized sentiment score sentiment_score = (positive_count - negative_count) / total_words return max(-1.0, min(1.0, sentiment_score)) def lambda_handler(event, context) -> Dict: """Main Lambda handler function.""" try: analyzer = RedditSentimentAnalyzer() # Fetch Reddit posts posts = fetch_reddit_posts() # Analyze sentiment for each post analyzed_data = [] for post in posts: sentiment_score = analyzer.analyze_sentiment(post['title']) analyzed_data.append({ 'id': post['id'], 'title': post['title'], 'sentiment_score': sentiment_score, 'timestamp': datetime.now().isoformat() }) # Generate CSV and upload to S3 csv_content = generate_csv(analyzed_data) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') s3_key = f"reddit_data_{timestamp}.csv" s3_client.put_object( Bucket=S3_BUCKET, Key=s3_key, Body=csv_content, ContentType='text/csv' ) return { 'statusCode': 200, 'body': json.dumps({ 'message': f'Successfully processed {len(posts)} posts', 's3_key': s3_key }) } except Exception as e: return { 'statusCode': 500, 'body': json.dumps({'error': str(e)}) }
-- Create external table for Reddit sentiment data CREATE EXTERNAL TABLE reddit_sentiment_data ( id STRING, post_title STRING, sentiment_score DOUBLE, post_date TIMESTAMP ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = ',', 'quoteChar' = '"', 'escapeChar' = '\\' ) STORED AS TEXTFILE LOCATION 's3://reddit-sentiment-quicksight/' TBLPROPERTIES ( 'skip.header.line.count'='1', 'classification'='csv' );
-- Daily sentiment trends SELECT DATE(post_date) AS date, COUNT(*) AS total_posts, AVG(sentiment_score) AS avg_sentiment, STDDEV(sentiment_score) AS sentiment_stddev, PERCENTILE_APPROX(sentiment_score, 0.5) AS median_sentiment FROM reddit_sentiment_data GROUP BY DATE(post_date) ORDER BY date DESC; -- Sentiment distribution analysis SELECT CASE WHEN sentiment_score > 0.2 THEN 'Positive' WHEN sentiment_score < -0.2 THEN 'Negative' ELSE 'Neutral' END AS sentiment_category, COUNT(*) AS post_count, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS percentage FROM reddit_sentiment_data GROUP BY CASE WHEN sentiment_score > 0.2 THEN 'Positive' WHEN sentiment_score < -0.2 THEN 'Negative' ELSE 'Neutral' END; -- Top trending positive posts SELECT id, post_title, sentiment_score, post_date FROM reddit_sentiment_data WHERE sentiment_score > 0 ORDER BY sentiment_score DESC, post_date DESC LIMIT 10;