Last updated: March 16, 2025

Are you a college student looking to enhance your programming portfolio with impressive Python projects? Look no further! Python continues to dominate as one of the most versatile and in-demand programming languages in 2025. Whether you’re a beginner or have intermediate skills, these innovative Python projects will boost your resume, strengthen your coding abilities, and give you practical experience that employers are actively seeking.
Quick Navigation:
- Why Python Projects Are Essential for College Students
- How to Choose the Right Project
- Top 10 Python Projects for College Students
- Tips for Project Success
- FAQs About Python Projects
Why Python Projects Are Essential for College Students {#why-python}
Python’s simplicity and versatility make it the perfect language for college projects. Here’s why building Python projects during your college years is crucial:
- Industry Relevance: Python skills are highly sought after in fields ranging from data science to web development
- Practical Application: Projects transform theoretical knowledge into practical skills
- Portfolio Building: Having completed projects demonstrates your capabilities to potential employers
- Problem-Solving Skills: Project development enhances your ability to solve real-world problems
- Career Advancement: Python expertise opens doors to internships and entry-level positions
How to Choose the Right Project {#how-to-choose}
Before diving into our recommended projects, consider these factors when selecting a Python project:
- Align with your interests: Choose projects in domains you’re passionate about
- Match your skill level: Start with simpler projects if you’re a beginner
- Consider time constraints: Be realistic about what you can accomplish
- Think about uniqueness: Add your own creative twist to stand out
- Focus on marketable skills: Select projects that teach in-demand technologies
Top 10 Python Projects for College Students {#top-10-projects}
1. AI-Powered Personal Study Assistant

Difficulty Level: Intermediate to Advanced
Skills Developed: Natural Language Processing, Machine Learning, API Integration
Project Description: Build an AI-powered study assistant that helps college students manage their learning process. This application uses natural language processing to understand questions, provide explanations, create flashcards from notes, and recommend additional resources based on the student’s learning patterns.
Key Features:
- Question-answering system using transformers
- Automatic summarization of lecture notes
- Personalized study plan generation
- Integration with calendar for study reminders
- Voice command capability
Implementation Steps:
- Set up a Python environment with necessary libraries (transformers, TensorFlow/PyTorch)
- Implement the NLP models for understanding user queries
- Create a knowledge base system for storing and retrieving information
- Develop a simple GUI using Tkinter or a web interface with Flask
- Implement a recommendation system for study resources
- Add voice recognition using libraries like SpeechRecognition
Sample Code Implementation:
# study_assistant.py
import os
import speech_recognition as sr
import datetime
import json
from tkinter import *
from tkinter import messagebox, ttk
from transformers import pipeline, BartTokenizer, BartForConditionalGeneration
import pyttsx3
import webbrowser
class StudyAssistant:
def __init__(self, root):
self.root = root
self.root.title("AI Study Assistant")
self.root.geometry("800x600")
# Initialize NLP components
self.qa_model = pipeline("question-answering")
self.summarizer_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
self.summarizer_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn")
# Initialize speech components
self.engine = pyttsx3.init()
self.recognizer = sr.Recognizer()
# Initialize knowledge base
self.knowledge_base = self.load_knowledge_base()
# Create UI
self.create_widgets()
def load_knowledge_base(self):
try:
with open("knowledge_base.json", "r") as f:
return json.load(f)
except FileNotFoundError:
# Create default knowledge base if file doesn't exist
default_kb = {
"subjects": [],
"notes": {},
"flashcards": {},
"resources": {}
}
with open("knowledge_base.json", "w") as f:
json.dump(default_kb, f)
return default_kb
def save_knowledge_base(self):
with open("knowledge_base.json", "w") as f:
json.dump(self.knowledge_base, f)
def create_widgets(self):
# Create tabs
self.tab_control = ttk.Notebook(self.root)
self.tab_qa = Frame(self.tab_control)
self.tab_notes = Frame(self.tab_control)
self.tab_flashcards = Frame(self.tab_control)
self.tab_plan = Frame(self.tab_control)
self.tab_control.add(self.tab_qa, text="Ask Questions")
self.tab_control.add(self.tab_notes, text="Notes Manager")
self.tab_control.add(self.tab_flashcards, text="Flashcards")
self.tab_control.add(self.tab_plan, text="Study Plan")
self.tab_control.pack(expand=1, fill="both")
# Question-answering tab
Label(self.tab_qa, text="Ask any study question:", font=("Arial", 14)).pack(pady=10)
self.context_frame = Frame(self.tab_qa)
self.context_frame.pack(pady=10, fill=X, padx=20)
Label(self.context_frame, text="Context (optional):", font=("Arial", 12)).pack(anchor=W)
self.context_text = Text(self.context_frame, height=8)
self.context_text.pack(fill=X)
self.question_frame = Frame(self.tab_qa)
self.question_frame.pack(pady=10, fill=X, padx=20)
Label(self.question_frame, text="Your question:", font=("Arial", 12)).pack(anchor=W)
self.question_entry = Entry(self.question_frame, font=("Arial", 12))
self.question_entry.pack(fill=X)
Button(self.tab_qa, text="Ask", command=self.answer_question, bg="#4CAF50", fg="white", font=("Arial", 12)).pack(pady=10)
Button(self.tab_qa, text="Voice Input", command=self.voice_input, bg="#2196F3", fg="white", font=("Arial", 12)).pack(pady=5)
Label(self.tab_qa, text="Answer:", font=("Arial", 12)).pack(anchor=W, padx=20, pady=5)
self.answer_text = Text(self.tab_qa, height=10, wrap=WORD)
self.answer_text.pack(fill=BOTH, expand=True, padx=20, pady=5)
# Implement other tabs similarly (notes, flashcards, plan)
# ...
def answer_question(self):
question = self.question_entry.get()
context = self.context_text.get("1.0", END)
if not question:
messagebox.showwarning("Input Required", "Please enter a question.")
return
if not context.strip():
# Search knowledge base if no context provided
for subject, notes in self.knowledge_base["notes"].items():
result = self.qa_model(question=question, context=notes)
if result["score"] > 0.7: # Confidence threshold
self.answer_text.delete("1.0", END)
self.answer_text.insert(END, result["answer"])
return
self.answer_text.delete("1.0", END)
self.answer_text.insert(END, "I don't have enough information to answer that question. Please provide some context or add relevant notes to your knowledge base.")
else:
# Use provided context
result = self.qa_model(question=question, context=context)
self.answer_text.delete("1.0", END)
self.answer_text.insert(END, result["answer"])
def voice_input(self):
try:
with sr.Microphone() as source:
self.answer_text.delete("1.0", END)
self.answer_text.insert(END, "Listening... Speak now.")
self.root.update()
audio = self.recognizer.listen(source)
text = self.recognizer.recognize_google(audio)
self.question_entry.delete(0, END)
self.question_entry.insert(0, text)
self.answer_text.delete("1.0", END)
self.answer_text.insert(END, f"Recognized: {text}")
except Exception as e:
self.answer_text.delete("1.0", END)
self.answer_text.insert(END, f"Error: {str(e)}")
def summarize_text(self, text):
inputs = self.summarizer_tokenizer(text, max_length=1024, return_tensors="pt", truncation=True)
summary_ids = self.summarizer_model.generate(inputs["input_ids"], num_beams=4, min_length=30, max_length=100, early_stopping=True)
summary = self.summarizer_tokenizer.decode(summary_ids[0], skip_special_tokens=True)
return summary
# Additional methods for other features would go here
# ...
if __name__ == "__main__":
root = Tk()
app = StudyAssistant(root)
root.mainloop()
Resources:
2. Blockchain-Based Academic Credential Verification System

Difficulty Level: Advanced
Skills Developed: Blockchain, Cryptography, Web Development, Database Management
Project Description: Create a secure system for verifying academic credentials using blockchain technology. This project allows educational institutions to issue digital certificates that can be instantly verified by employers, eliminating certificate fraud and streamlining the verification process.
Key Features:
- Blockchain implementation for storing credential hashes
- Digital signature verification
- QR code generation for easy credential sharing
- Admin portal for educational institutions
- Verification portal for employers
Implementation Steps:
- Set up a Python environment with blockchain libraries (e.g., web3.py)
- Design the database schema for storing credential metadata
- Implement the blockchain component for secure verification
- Create the web interface using Flask or Django
- Develop the digital signature system
- Implement the QR code generation system
Sample Code Implementation:
# app.py - Main Flask application
from flask import Flask, render_template, request, jsonify, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from web3 import Web3
import json
import qrcode
import hashlib
import os
from io import BytesIO
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///credentials.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# Connect to local Ethereum node (would connect to actual network in production)
w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))
# Simple blockchain implementation (in production, would use actual Ethereum network)
class SimpleBlockchain:
def __init__(self):
self.chain = []
self.create_genesis_block()
def create_genesis_block(self):
# First block in the chain
self.chain.append({
'index': 1,
'timestamp': str(datetime.datetime.now()),
'data': "Genesis Block",
'prev_hash': "0",
'hash': self.hash_block("Genesis Block")
})
def hash_block(self, data):
# Create SHA-256 hash of block
return hashlib.sha256(str(data).encode()).hexdigest()
def add_credential(self, credential_data):
# Add new credential to blockchain
prev_block = self.chain[-1]
new_block = {
'index': len(self.chain) + 1,
'timestamp': str(datetime.datetime.now()),
'data': credential_data,
'prev_hash': prev_block['hash'],
'hash': self.hash_block(credential_data + prev_block['hash'])
}
self.chain.append(new_block)
return new_block['hash']
def verify_credential(self, credential_hash):
# Check if credential hash exists in blockchain
for block in self.chain:
if block['hash'] == credential_hash:
return True
return False
# Initialize blockchain
blockchain = SimpleBlockchain()
# Database models
class Institution(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
public_key = db.Column(db.Text, nullable=False)
credentials = db.relationship('Credential', backref='issuer', lazy=True)
class Credential(db.Model):
id = db.Column(db.Integer, primary_key=True)
student_name = db.Column(db.String(100), nullable=False)
student_id = db.Column(db.String(50), nullable=False)
credential_type = db.Column(db.String(50), nullable=False)
issue_date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
blockchain_hash = db.Column(db.String(200), nullable=False)
digital_signature = db.Column(db.Text, nullable=False)
institution_id = db.Column(db.Integer, db.ForeignKey('institution.id'), nullable=False)
# Routes
@app.route('/')
def index():
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register_institution():
if request.method == 'POST':
name = request.form.get('name')
email = request.form.get('email')
password = request.form.get('password')
# Check if institution already exists
if Institution.query.filter_by(email=email).first():
return "Institution already registered"
# Generate RSA key pair
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
public_key = private_key.public_key()
# Serialize public key for storage
public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
# Serialize private key for the institution to download
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
# Create new institution
new_institution = Institution(
name=name,
email=email,
password_hash=generate_password_hash(password),
public_key=public_key_pem
)
db.session.add(new_institution)
db.session.commit()
# In a real application, you would securely provide the private key to the institution
return render_template('private_key.html', private_key=private_key_pem)
return render_template('register.html')
@app.route('/issue', methods=['GET', 'POST'])
def issue_credential():
# In a real app, you would verify the institution is logged in
if request.method == 'POST':
student_name = request.form.get('student_name')
student_id = request.form.get('student_id')
credential_type = request.form.get('credential_type')
institution_id = request.form.get('institution_id')
signature = request.form.get('signature') # Digital signature would be created client-side
# Create credential data
credential_data = f"{student_name}:{student_id}:{credential_type}:{institution_id}:{datetime.datetime.now()}"
# Add to blockchain
blockchain_hash = blockchain.add_credential(credential_data)
# Create new credential record
new_credential = Credential(
student_name=student_name,
student_id=student_id,
credential_type=credential_type,
blockchain_hash=blockchain_hash,
digital_signature=signature,
institution_id=institution_id
)
db.session.add(new_credential)
db.session.commit()
# Generate QR code
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(f"https://example.com/verify/{blockchain_hash}")
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffered = BytesIO()
img.save(buffered)
img_str = base64.b64encode(buffered.getvalue()).decode()
return render_template('credential_issued.html',
credential=new_credential,
qr_code=img_str)
institutions = Institution.query.all()
return render_template('issue.html', institutions=institutions)
@app.route('/verify/<hash>', methods=['GET'])
def verify_credential(hash):
# Check if credential exists in blockchain
is_valid = blockchain.verify_credential(hash)
if is_valid:
# Get credential details from database
credential = Credential.query.filter_by(blockchain_hash=hash).first()
if credential:
return render_template('verify.html',
credential=credential,
institution=credential.issuer,
is_valid=True)
return render_template('verify.html', is_valid=False)
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
Resources:
3. Smart Personal Finance Manager with Predictive Analytics

Difficulty Level: Intermediate
Skills Developed: Data Analysis, Machine Learning, Data Visualization, API Integration
Project Description: Develop a comprehensive personal finance manager that helps college students track expenses, set budgets, and make financial decisions. What sets this project apart is its predictive analytics component that forecasts future expenses based on spending patterns and suggests ways to optimize finances.
Key Features:
- Expense tracking and categorization
- Budget setting and monitoring
- Bank account integration via secure APIs
- Expense prediction using machine learning
- Interactive data visualizations
- Customized financial advice
Implementation Steps:
- Set up a Python environment with data analysis libraries (pandas, NumPy)
- Implement the database for storing financial data
- Create models for expense categorization
- Develop prediction algorithms using scikit-learn
- Build visualization components with Matplotlib or Plotly
- Create a user-friendly interface with Streamlit or Flask
Sample Code Implementation:
# finance_manager.py
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import LabelEncoder
from datetime import datetime, timedelta
import sqlite3
import uuid
import hashlib
import json
import os
class FinanceManager:
def __init__(self):
self.conn = sqlite3.connect('finance.db', check_same_thread=False)
self.create_tables()
def create_tables(self):
cursor = self.conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE,
password TEXT,
created_at TIMESTAMP
)
''')
# Create transactions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS transactions (
id TEXT PRIMARY KEY,
user_id TEXT,
date TIMESTAMP,
amount REAL,
category TEXT,
description TEXT,
is_income BOOLEAN,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Create budgets table
cursor.execute('''
CREATE TABLE IF NOT EXISTS budgets (
id TEXT PRIMARY KEY,
user_id TEXT,
category TEXT,
amount REAL,
period TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
self.conn.commit()
def register_user(self, username, password):
cursor = self.conn.cursor()
user_id = str(uuid.uuid4())
hashed_password = hashlib.sha256(password.encode()).hexdigest()
try:
cursor.execute(
"INSERT INTO users (id, username, password, created_at) VALUES (?, ?, ?, ?)",
(user_id, username, hashed_password, datetime.now())
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def authenticate_user(self, username, password):
cursor = self.conn.cursor()
hashed_password = hashlib.sha256(password.encode()).hexdigest()
cursor.execute(
"SELECT id FROM users WHERE username = ? AND password = ?",
(username, hashed_password)
)
result = cursor.fetchone()
return result[0] if result else None
def add_transaction(self, user_id, date, amount, category, description, is_income):
cursor = self.conn.cursor()
transaction_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO transactions (id, user_id, date, amount, category, description, is_income) VALUES (?, ?, ?, ?, ?, ?, ?)",
(transaction_id, user_id, date, amount, category, description, is_income)
)
self.conn.commit()
return transaction_id
def get_transactions(self, user_id, start_date=None, end_date=None):
cursor = self.conn.cursor()
query = "SELECT * FROM transactions WHERE user_id = ?"
params = [user_id]
if start_date:
query += " AND date >= ?"
params.append(start_date)
if end_date:
query += " AND date <= ?"
params.append(end_date)
cursor.execute(query, params)
columns = [col[0] for col in cursor.description]
transactions = [dict(zip(columns, row)) for row in cursor.fetchall()]
return transactions
def set_budget(self, user_id, category, amount, period):
cursor = self.conn.cursor()
budget_id = str(uuid.uuid4())
# Check if budget already exists
cursor.execute(
"SELECT id FROM budgets WHERE user_id = ? AND category = ? AND period = ?",
(user_id, category, period)
)
existing_budget = cursor.fetchone()
if existing_budget:
# Update existing budget
cursor.execute(
"UPDATE budgets SET amount = ? WHERE id = ?",
(amount, existing_budget[0])
)
else:
# Create new budget
cursor.execute(
"INSERT INTO budgets (id, user_id, category, amount, period) VALUES (?, ?, ?, ?, ?)",
(budget_id, user_id, category, amount, period)
)
self.conn.commit()
def get_budgets(self, user_id, period=None):
cursor = self.conn.cursor()
query = "SELECT * FROM budgets WHERE user_id = ?"
params = [user_id]
if period:
query += " AND period = ?"
params.append(period)
cursor.execute(query, params)
columns = [col[0] for col in cursor.description]
budgets = [dict(zip(columns, row)) for row in cursor.fetchall()]
return budgets
def predict_expenses(self, user_id, prediction_months=3):
# Get past transactions
transactions = self.get_transactions(user_id)
if not transactions:
return None
# Convert to DataFrame
df = pd.DataFrame(transactions)
df['date'] = pd.to_datetime(df['date'])
# Filter for expenses only
expenses_df = df[df['is_income'] == 0].copy()
if expenses_df.empty:
return None
# Extract features
expenses_df['month'] = expenses_df['date'].dt.month
expenses_df['year'] = expenses_df['date'].dt.year
expenses_df['day_of_month'] = expenses_df['date'].dt.day
expenses_df['day_of_week'] = expenses_df['date'].dt.dayofweek
# Encode categorical features
le = LabelEncoder()
expenses_df['category_encoded'] = le.fit_transform(expenses_df['category'])
# Group by month and category to get monthly spending per category
monthly_by_category = expenses_df.groupby(['year', 'month', 'category']).agg({
'amount': 'sum',
'category_encoded': 'first'
}).reset_index()
# Prepare training data
X = monthly_by_category[['year', 'month', 'category_encoded']]
y = monthly_by_category['amount']
# Train model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
# Generate future dates for prediction
last_date = expenses_df['date'].max()
future_dates = []
categories = expenses_df['category'].unique()
for i in range(1, prediction_months + 1):
future_date = last_date + pd.DateOffset(months=i)
for category in categories:
category_encoded = le.transform([category])[0]
future_dates.append({
'year': future_date.year,
'month': future_date.month,
'category': category,
'category_encoded': category_encoded
})
future_df = pd.DataFrame(future_dates)
# Make predictions
future_df['predicted_amount'] = model.predict(future_df[['year', 'month', 'category_encoded']])
# Format results
predictions = future_df.groupby(['year', 'month']).agg({
'predicted_amount': 'sum'
}).reset_index()
predictions_by_category = future_df.copy()
return {
'total_by_month': predictions.to_dict('records'),
'by_category': predictions_by_category.to_dict('records')
}
def visualize_spending(self, user_id, period="last6months"):
transactions = self.get_transactions(user_id)
if not transactions:
return None
df = pd.DataFrame(transactions)
df['date'] = pd.to_datetime(df['date'])
# Filter by period
now = datetime.now()
if period == "last6months":
start_date = now - timedelta(days=180)
elif period == "last12months":
start_date = now - timedelta(days=365)
elif period == "thisyear":
start_date = datetime(now.year, 1, 1)
else:
start_date = df['date'].min()
df = df[df['date'] >= start_date]
# Separate income and expenses
income_df = df[df['is_income'] == 1]
expenses_df = df[df['is_income'] == 0]
# Monthly totals
df['month_year'] = df['date'].dt.strftime('%Y-%m')
monthly_totals = df.groupby(['month_year', 'is_income']).agg({
'amount': 'sum'
}).reset_index()
# Category breakdown
category_totals = expenses_df.groupby('category').agg({
'amount': 'sum'
}).reset_index().sort_values('amount', ascending=False)
# Time series data
time_series = df.groupby(['date', 'is_income']).agg({
'amount': 'sum'
}).reset_index()
return {
'monthly_totals': monthly_totals.to_dict('records'),
'category_totals': category_totals.to_dict('records'),
'time_series': time_series.to_dict('records')
}
# Streamlit app implementation
def main():
st.set_page_config(page_title="Smart Finance Manager", layout="wide")
finance_manager = FinanceManager()
# Authentication
if 'user_id' not in st.session_state:
st.session_state.user_id = None
if st.session_state.user_id is None:
st.title("Smart Personal Finance Manager")
tab1, tab2 = st.tabs(["Login", "Register"])
with tab1:
st.subheader("Login")
username = st.text_input("Username", key="login_username")
password = st.text_input("Password", type="password", key="login_password")
if st.button("Login"):
user_id = finance_manager.authenticate_user(username, password)
if user_id:
st.session_state.user_id = user_id
st.experimental_rerun()
else:
st.error("Invalid username or password")
with tab2:
st.subheader("Register")
username = st.text_input("Username", key="register_username")
password = st.text_input("Password", type="password", key="register_password")
confirm_password = st.text_input("Confirm Password", type="password")
if st.button("Register"):
if password != confirm_password:
st.error("Passwords do not match")
elif not username or not password:
st.error("Username and password are required")
else:
success = finance_manager.register_user(username, password)
if success:
st.success("Registration successful. Please log in.")
else:
st.error("Username already exists")
return
# Main application (user is logged in)
st.title("Smart Personal Finance Manager")
if st.button("Logout", key="logout"):
st.session_state.user_id = None
st.experimental_rerun()
tab1, tab2, tab3, tab4 = st.tabs(["Dashboard", "Transactions", "Budget", "Predictions"])
with tab1:
st.header("Dashboard")
# Get visualizations
visualizations = finance_manager.visualize_spending(st.session_state.user_id)
if not visualizations:
st.info("No data to display. Please add some transactions.")
else:
# Convert to DataFrames
monthly_df = pd.DataFrame(visualizations['monthly_totals'])
category_df = pd.DataFrame(visualizations['category_totals'])
time_series_df = pd.DataFrame(visualizations['time_series'])
# Monthly Income vs Expenses
st.subheader("Monthly Income vs Expenses")
monthly_pivot = monthly_df.pivot(index='month_year', columns='is_income', values='amount').reset_index()
monthly_pivot.columns = ['month_year', 'expenses', 'income']
monthly_pivot.fillna(0, inplace=True)
fig1 = px.bar(monthly_pivot, x='month_year', y=['income', 'expenses'],
title="Monthly Income vs Expenses",
labels={'value': 'Amount', 'month_year': 'Month'},
barmode='group',
color_discrete_map={'income': 'green', 'expenses': 'red'})
st.plotly_chart(fig1, use_container_width=True)
# Expense categories
st.subheader("Expense Categories")
fig2 = px.pie(category_df, values='amount', names='category',
title="Expenses by Category")
st.plotly_chart(fig2, use_container_width=True)
# Display recent transactions
transactions = finance_manager.get_transactions(st.session_state.user_id)
recent_transactions = sorted(transactions, key=lambda x: x['date'], reverse=True)[:5]
st.subheader("Recent Transactions")
for tx in recent_transactions:
tx_type = "Income" if tx['is_income'] else "Expense"
amount = f"${tx['amount']:.2f}"
st.write(f"**{tx['date']}** - {tx['category']} ({tx_type}): {amount} - {tx['description']}")
with tab2:
st.header("Manage Transactions")
# Add new transaction
st.subheader("Add New Transaction")
col1, col2 = st.columns(2)
with col1:
date = st.date_input("Date", value=datetime.now())
amount = st.number_input("Amount", min_value=0.01, format="%.2f")
is_income = st.selectbox("Type", options=["Expense", "Income"]) == "Income"
with col2:
categories = ["Food", "Rent", "Transportation", "Entertainment", "Utilities",
"Education", "Healthcare", "Clothing", "Salary", "Gift", "Other"]
category = st.selectbox("Category", options=categories)
description = st.text_input("Description")
if st.button("Add Transaction"):
finance_manager.add_transaction(
st.session_state.user_id,
date,
amount,
category,
description,
is_income
)
st.success("Transaction added successfully!")
st.experimental_rerun()
# View transactions
st.subheader("Transaction History")
transactions = finance_manager.get_transactions(st.session_state.user_id)
if not transactions:
st.info("No transactions found.")
else:
df = pd.DataFrame(transactions)
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date', ascending=False)
df['type'] = df['is_income'].apply(lambda x: "Income" if x else "Expense")
df['amount'] = df['amount'].apply(lambda x: f"${x:.2f}")
st.dataframe(df[['date', 'type', 'category', 'amount', 'description']])
with tab3:
st.header("Budget Management")
# Set budget
st.subheader("Set Budget")
col1, col2 = st.columns(2)
with col1:
categories = ["Food", "Rent", "Transportation", "Entertainment", "Utilities",
"Education", "Healthcare", "Clothing", "Other"]
budget_category = st.selectbox("Category", options=categories)
budget_amount = st.number_input("Monthly Budget Amount", min_value=0.01, format="%.2f")
with col2:
budget_period = st.selectbox("Period", options=["Monthly", "Yearly"])
if st.button("Set Budget"):
finance_manager.set_budget(
st.session_state.user_id,
budget_category,
budget_amount,
budget_period
)
st.success("Budget set successfully!")
st.experimental_rerun()
# View budgets
st.subheader("Current Budgets")
budgets = finance_manager.get_budgets(st.session_state.user_id)
if not budgets:
st.info("No budgets set.")
else:
budget_df = pd.DataFrame(budgets)
# Get actual spending
transactions = finance_manager.get_transactions(st.session_state.user_id)
tx_df = pd.DataFrame(transactions)
if not tx_df.empty:
tx_df['date'] = pd.to_datetime(tx_df['date'])
current_month = datetime.now().month
current_year = datetime.now().year
# Filter for current month and expenses
current_month_expenses = tx_df[
(tx_df['date'].dt.month == current_month) &
(tx_df['date'].dt.year == current_year) &
(tx_df['is_income'] == 0)
]
# Group by category
if not current_month_expenses.empty:
category_spending = current_month_expenses.groupby('category')['amount'].sum().reset_index()
# Merge with budgets
budget_df = pd.merge(
budget_df,
category_spending,
on='category',
how='left'
)
budget_df['amount_spent'] = budget_df['amount_y'].fillna(0)
budget_df['budget'] = budget_df['amount_x']
budget_df['remaining'] = budget_df['budget'] - budget_df['amount_spent']
budget_df['percent_used'] = (budget_df['amount_spent'] / budget_df['budget'] * 100).round(2)
# Create progress bars
for _, row in budget_df.iterrows():
st.write(f"**{row['category']}**: ${row['amount_spent']:.2f} of ${row['budget']:.2f} ({row['percent_used']}%)")
progress = min(100, row['percent_used']) / 100
st.progress(progress)
else:
for _, row in budget_df.iterrows():
st.write(f"**{row['category']}**: $0.00 of ${row['amount']:.2f} (0%)")
st.progress(0)
else:
for _, row in budget_df.iterrows():
st.write(f"**{row['category']}**: $0.00 of ${row['amount']:.2f} (0%)")
st.progress(0)
with tab4:
st.header("Expense Predictions")
# Generate predictions
predictions = finance_manager.predict_expenses(st.session_state.user_id)
if not predictions:
st.info("Not enough data for predictions. Please add more transactions.")
else:
# Total monthly predictions
st.subheader("Predicted Monthly Expenses")
monthly_pred_df = pd.DataFrame(predictions['total_by_month'])
monthly_pred_df['month_year'] = monthly_pred_df.apply(
lambda x: f"{x['year']}-{x['month']:02d}", axis=1
)
fig3 = px.bar(
monthly_pred_df,
x='month_year',
y='predicted_amount',
title="Predicted Monthly Expenses",
labels={'predicted_amount': 'Amount', 'month_year': 'Month'}
)
st.plotly_chart(fig3, use_container_width=True)
# Category predictions
st.subheader("Predicted Expenses by Category")
category_pred_df = pd.DataFrame(predictions['by_category'])
# Group by category
category_totals = category_pred_df.groupby('category')['predicted_amount'].sum().reset_index()
fig4 = px.pie(
category_totals,
values='predicted_amount',
names='category',
title="Predicted Expenses by Category"
)
st.plotly_chart(fig4, use_container_width=True)
# Financial insights
st.subheader("Financial Insights")
# Calculate total predicted expenses
total_predicted = monthly_pred_df['predicted_amount'].sum()
# Get average monthly income
transactions = finance_manager.get_transactions(st.session_state.user_id)
tx_df = pd.DataFrame(transactions)
if not tx_df.empty:
tx_df['date'] = pd.to_datetime(tx_df['date'])
income_df = tx_df[tx_df['is_income'] == 1]
if not income_df.empty:
income_df['month_year'] = income_df['date'].dt.strftime('%Y-%m')
monthly_income = income_df.groupby('month_year')['amount'].sum()
avg_monthly_income = monthly_income.mean()
# Calculate predicted savings
avg_monthly_expenses = total_predicted / len(monthly_pred_df)
predicted_savings = avg_monthly_income - avg_monthly_expenses
st.write(f"**Average Monthly Income:** ${avg_monthly_income:.2f}")
st.write(f"**Predicted Average Monthly Expenses:** ${avg_monthly_expenses:.2f}")
if predicted_savings > 0:
st.write(f"**Predicted Monthly Savings:** ${predicted_savings:.2f}")
st.write(f"**Projected Annual Savings:** ${predicted_savings * 12:.2f}")
else:
st.warning(f"**Predicted Monthly Deficit:** ${-predicted_savings:.2f}")
st.write("Consider reducing expenses in the following categories:")
# Suggest categories to reduce
top_categories = category_totals.sort_values('predicted_amount', ascending=False).head(3)
for _, row in top_categories.iterrows():
st.write(f"- {row['category']}: ${row['predicted_amount']:.2f}")
else:
st.info("No income data available for savings prediction.")
else:
st.info("No transaction data available for financial insights.")
if __name__ == "__main__":
main()
Resources:
- Pandas Documentation
- Scikit-learn Documentation
- Plaid API for Banking Integration
- Streamlit Documentation
4. Sustainable Campus Smart Grid Simulation

Difficulty Level: Advanced
Skills Developed: IoT, Data Simulation, Algorithm Design, System Modeling
Project Description: Create a simulation of a smart grid system for a college campus that optimizes energy usage, incorporates renewable energy sources, and reduces overall energy costs. This project combines IoT concepts with algorithmic optimization to create a sustainable energy management system.
Key Features:
- Energy consumption simulation
- Renewable energy integration modeling
- Peak load prediction and management
- Cost optimization algorithms
- Real-time monitoring dashboard
- “What-if” scenario analysis
Implementation Steps:
- Research smart grid systems and sustainable energy concepts
- Design the simulation architecture
- Implement energy consumption models
- Develop optimization algorithms
- Create a visualization dashboard using Dash or Streamlit
- Test with different scenarios to validate the model
Sample Code Implementation:
# smart_grid_simulation.py
import numpy as np
import pandas as pd
import datetime
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objects as go
import plotly.express as px
from scipy.optimize import minimize
import json
import os
class SmartGridSimulation:
def __init__(self, config_file=None):
# Load configuration or use defaults
if config_file and os.path.exists(config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
else:
self.config = {
'campus': {
'buildings': [
{'name': 'Academic Building', 'type': 'academic', 'size': 10000, 'peak_load': 500},
{'name': 'Dormitory A', 'type': 'residential', 'size': 5000, 'peak_load': 300},
{'name': 'Dormitory B', 'type': 'residential', 'size': 5000, 'peak_load': 300},
{'name': 'Sports Complex', 'type': 'recreational', 'size': 8000, 'peak_load': 400},
{'name': 'Library', 'type': 'academic', 'size': 7000, 'peak_load': 350},
{'name': 'Admin Building', 'type': 'administrative', 'size': 3000, 'peak_load': 200},
{'name': 'Cafeteria', 'type': 'food_service', 'size': 2000, 'peak_load': 250},
]
},
'energy_sources': {
'grid': {'max_capacity': 2000, 'cost_per_kwh': 0.12},
'solar': {'capacity': 500, 'installation_area': 2000, 'efficiency': 0.2, 'cost_per_kwh': 0.0},
'wind': {'capacity': 300, 'turbines': 3, 'efficiency': 0.35, 'cost_per_kwh': 0.0},
'battery_storage': {'capacity': 1000, 'max_charge_rate': 100, 'max_discharge_rate': 200, 'efficiency': 0.9}
},
'simulation': {
'time_step': 60, # minutes
'duration': 7, # days
'start_date': '2025-03-01'
}
}
# Initialize simulation state
self.current_time = pd.to_datetime(self.config['simulation']['start_date'])
self.end_time = self.current_time + pd.Timedelta(days=self.config['simulation']['duration'])
self.time_step = pd.Timedelta(minutes=self.config['simulation']['time_step'])
# Battery state of charge (SOC) starts at 50%
self.battery_soc = 0.5 * self.config['energy_sources']['battery_storage']['capacity']
# Simulation results storage
self.results = {
'timestamp': [],
'building_load': {},
'total_load': [],
'solar_generation': [],
'wind_generation': [],
'grid_consumption': [],
'battery_charge': [],
'battery_discharge': [],
'battery_soc': [],
'total_cost': [],
'co2_emissions': []
}
# Initialize building loads
for building in self.config['campus']['buildings']:
self.results['building_load'][building['name']] = []
# Weather data - for a real project, you might use an API or historical data
self.generate_weather_data()
def generate_weather_data(self):
# Generate synthetic weather data for the simulation period
timestamps = pd.date_range(
start=self.current_time,
end=self.end_time,
freq=self.time_step
)
# Solar irradiance (W/m²) - follows daily pattern
base_irradiance = np.zeros(len(timestamps))
for i, ts in enumerate(timestamps):
hour = ts.hour
# Daylight hours (6 AM to 6 PM)
if 6 <= hour < 18:
# Peak at noon
peak_factor = 1 - abs(hour - 12) / 6
base_irradiance[i] = 1000 * peak_factor
# Add some randomness for cloud cover
cloud_factor = 0.7 + 0.3 * np.random.random(len(timestamps))
solar_irradiance = base_irradiance * cloud_factor
# Wind speed (m/s) - more random
wind_base = 5 + 3 * np.sin(np.linspace(0, 4*np.pi, len(timestamps)))
wind_random = 2 * np.random.random(len(timestamps))
wind_speed = wind_base + wind_random
# Temperature (°C) - follows daily pattern
temp_base = np.zeros(len(timestamps))
for i, ts in enumerate(timestamps):
hour = ts.hour
day_temp = 15 + 5 * np.sin(np.pi * (ts.day_of_year % 365) / 182.5) # Seasonal variation
hour_offset = 5 * np.sin(np.pi * (hour - 14) / 12) # Daily variation (peak at 2 PM)
temp_base[i] = day_temp + hour_offset
temp_random = 2 * (np.random.random(len(timestamps)) - 0.5)
temperature = temp_base + temp_random
# Store weather data
self.weather_data = pd.DataFrame({
'timestamp': timestamps,
'solar_irradiance': solar_irradiance,
'wind_speed': wind_speed,
'temperature': temperature
})
def calculate_building_load(self, building, timestamp):
"""Calculate the energy load for a building at a given time."""
hour = timestamp.hour
day_of_week = timestamp.dayofweek
# Base load as percentage of peak load
if building['type'] == 'academic':
if day_of_week < 5: # Weekday
if 8 <= hour < 18: # Working hours
base_load_pct = 0.7 + 0.3 * np.random.random()
elif 18 <= hour < 22: # Evening classes
base_load_pct = 0.4 + 0.2 * np.random.random()
else: # Night hours
base_load_pct = 0.2 + 0.1 * np.random.random()
else: # Weekend
if 10 <= hour < 16: # Weekend activity
base_load_pct = 0.3 + 0.2 * np.random.random()
else:
base_load_pct = 0.1 + 0.1 * np.random.random()
elif building['type'] == 'residential':
if 7 <= hour < 9 or 17 <= hour < 23: # Morning/Evening peak
base_load_pct = 0.8 + 0.2 * np.random.random()
elif 23 <= hour or hour < 7: # Night
base_load_pct = 0.4 + 0.1 * np.random.random()
else: # Day
base_load_pct = 0.3 + 0.2 * np.random.random()
elif building['type'] == 'recreational':
if day_of_week < 5: # Weekday
if 16 <= hour < 21: # After classes
base_load_pct = 0.7 + 0.3 * np.random.random()
elif 9 <= hour < 16: # During day
base_load_pct = 0.4 + 0.2 * np.random.random()
else:
base_load_pct = 0.1 + 0.1 * np.random.random()
else: # Weekend
if 10 <= hour < 20: # Weekend activity
base_load_pct = 0.8 + 0.2 * np.random.random()
else:
base_load_pct = 0.1 + 0.1 * np.random.random()
elif building['type'] == 'food_service':
if 7 <= hour < 10 or 11 <= hour < 14 or 17 <= hour < 20: # Meal times
base_load_pct = 0.9 + 0.1 * np.random.random()
elif 6 <= hour < 7 or 10 <= hour < 11 or 14 <= hour < 17 or 20 <= hour < 21: # Prep times
base_load_pct = 0.6 + 0.2 * np.random.random()
else:
base_load_pct = 0.2 + 0.1 * np.random.random()
else: # Administrative and others
if day_of_week < 5: # Weekday
if 8 <= hour < 17: # Working hours
base_load_pct = 0.7 + 0.2 * np.random.random()
else:
base_load_pct = 0.1 + 0.1 * np.random.random()
else: # Weekend
base_load_pct = 0.1 + 0.1 * np.random.random()
# Adjust for temperature effect on HVAC
weather_at_time = self.weather_data[self.weather_data['timestamp'] == timestamp]
if not weather_at_time.empty:
temp = weather_at_time.iloc[0]['temperature']
# HVAC load increases as temperature deviates from comfort zone (20-22°C)
temp_effect = 0.1 * max(0, abs(temp - 21) - 2) / 10
base_load_pct += temp_effect
return building['peak_load'] * min(1.0, base_load_pct)
def calculate_renewable_generation(self, timestamp):
"""Calculate renewable energy generation based on weather conditions."""
weather_at_time = self.weather_data[self.weather_data['timestamp'] == timestamp]
if weather_at_time.empty:
return 0, 0
weather = weather_at_time.iloc[0]
# Solar generation
solar_config = self.config['energy_sources']['solar']
solar_irradiance = weather['solar_irradiance'] # W/m²
solar_generation = (
solar_irradiance * solar_config['installation_area'] *
solar_config['efficiency'] / 1000 # Convert to kW
)
# Wind generation
wind_config = self.config['energy_sources']['wind']
wind_speed = weather['wind_speed'] # m/s
# Simple wind turbine model
# No generation below cut-in speed (3 m/s) or above cut-out speed (25 m/s)
if wind_speed < 3 or wind_speed > 25:
wind_generation = 0
else:
# Simplified power curve
if wind_speed < 12:
# Ramp up from cut-in to rated speed
power_factor = (wind_speed - 3) / 9
else:
# Rated power from rated speed to cut-out
power_factor = 1.0
wind_generation = (
wind_config['capacity'] * wind_config['turbines'] *
power_factor * wind_config['efficiency']
)
return solar_generation, wind_generation
def optimize_energy_dispatch(self, total_load, solar_gen, wind_gen):
"""Optimize the energy dispatch between different sources."""
renewable_gen = solar_gen + wind_gen
battery_config = self.config['energy_sources']['battery_storage']
# If renewables exceed load, charge battery with excess
if renewable_gen >= total_load:
excess_energy = renewable_gen - total_load
battery_charge = min(
excess_energy,
battery_config['max_charge_rate'],
battery_config['capacity'] - self.battery_soc
)
battery_discharge = 0
grid_consumption = 0
else:
# Need additional power
energy_deficit = total_load - renewable_gen
# Optimize between grid and battery discharge
# Simple strategy: Use battery first if SOC is good, then grid
if self.battery_soc > 0.2 * battery_config['capacity']:
battery_discharge = min(
energy_deficit,
battery_config['max_discharge_rate'],
self.battery_soc
)
grid_consumption = energy_deficit - battery_discharge
else:
# Battery SOC too low, use grid instead
battery_discharge = 0
grid_consumption = energy_deficit
battery_charge = 0
# Update battery SOC
self.battery_soc = (
self.battery_soc +
battery_charge * battery_config['efficiency'] -
battery_discharge / battery_config['efficiency']
)
# Calculate cost
grid_cost = grid_consumption * self.config['energy_sources']['grid']['cost_per_kwh']
# Calculate CO2 emissions (typical grid emissions factor: 0.5 kg CO2/kWh)
co2_emissions = grid_consumption * 0.5 # kg CO2
return {
'grid_consumption': grid_consumption,
'battery_charge': battery_charge,
'battery_discharge': battery_discharge,
'battery_soc': self.battery_soc,
'cost': grid_cost,
'co2_emissions': co2_emissions
}
def run_simulation_step(self):
"""Run a single step of the simulation."""
# Calculate building loads
building_loads = {}
for building in self.config['campus']['buildings']:
load = self.calculate_building_load(building, self.current_time)
building_loads[building['name']] = load
total_load = sum(building_loads.values())
# Calculate renewable generation
solar_gen, wind_gen = self.calculate_renewable_generation(self.current_time)
# Optimize energy dispatch
dispatch = self.optimize_energy_dispatch(total_load, solar_gen, wind_gen)
# Store results
self.results['timestamp'].append(self.current_time)
for building_name, load in building_loads.items():
self.results['building_load'][building_name].append(load)
self.results['total_load'].append(total_load)
self.results['solar_generation'].append(solar_gen)
self.results['wind_generation'].append(wind_gen)
self.results['grid_consumption'].append(dispatch['grid_consumption'])
self.results['battery_charge'].append(dispatch['battery_charge'])
self.results['battery_discharge'].append(dispatch['battery_discharge'])
self.results['battery_soc'].append(dispatch['battery_soc'])
self.results['total_cost'].append(dispatch['cost'])
self.results['co2_emissions'].append(dispatch['co2_emissions'])
# Advance time
self.current_time += self.time_step
def run_simulation(self):
"""Run the complete simulation."""
while self.current_time <= self.end_time:
self.run_simulation_step()
def get_results_df(self):
"""Convert results to a pandas DataFrame."""
df = pd.DataFrame({
'timestamp': self.results['timestamp'],
'total_load': self.results['total_load'],
'solar_generation': self.results['solar_generation'],
'wind_generation': self.results['wind_generation'],
'grid_consumption': self.results['grid_consumption'],
'battery_charge': self.results['battery_charge'],
'battery_discharge': self.results['battery_discharge'],
'battery_soc': self.results['battery_soc'],
'total_cost': self.results['total_cost'],
'co2_emissions': self.results['co2_emissions']
})
# Add building loads
for building_name, loads in self.results['building_load'].items():
df[f'load_{building_name}'] = loads
return df
def run_scenario(self, scenario_config):
"""Run a what-if scenario with modified parameters."""
# Create a copy of the current configuration
original_config = self.config.copy()
# Update with scenario parameters
for category, params in scenario_config.items():
if category in self.config:
if isinstance(self.config[category], dict):
for key, value in params.items():
if key in self.config[category]:
self.config[category][key] = value
# Re-initialize and run simulation
self.__init__(config_file=None) # Reset with new config
self.run_simulation()
# Get results
results = self.get_results_df()
# Restore original configuration
self.config = original_config
return results
# Dashboard Application
def create_dashboard(simulation):
app = dash.Dash(__name__, title="Campus Smart Grid Simulation")
# Run initial simulation
simulation.run_simulation()
results_df = simulation.get_results_df()
# Get building names for dropdown
building_names = [building['name'] for building in simulation.config['campus']['buildings']]
app.layout = html.Div([
html.H1("Sustainable Campus Smart Grid Simulation",
style={'textAlign': 'center', 'color': '#2a3f5f', 'marginBottom': 20}),
html.Div([
html.Div([
html.H3("Simulation Controls", style={'color': '#2a3f5f'}),
html.Label("Simulation Duration (days):"),
dcc.Slider(
id='duration-slider',
min=1,
max=30,
value=simulation.config['simulation']['duration'],
marks={i: f'{i}' for i in range(0, 31, 5)},
step=1
),
html.Label("Solar Capacity (kW):"),
dcc.Slider(
id='solar-slider',
min=0,
max=1000,
value=simulation.config['energy_sources']['solar']['capacity'],
marks={i: f'{i}' for i in range(0, 1001, 200)},
step=50
),
html.Label("Wind Capacity per Turbine (kW):"),
dcc.Slider(
id='wind-slider',
min=0,
max=300,
value=simulation.config['energy_sources']['wind']['capacity'],
marks={i: f'{i}' for i in range(0, 301, 50)},
step=10
),
html.Label("Number of Wind Turbines:"),
dcc.Slider(
id='turbines-slider',
min=0,
max=10,
value=simulation.config['energy_sources']['wind']['turbines'],
marks={i: f'{i}' for i in range(0, 11)},
step=1
),
html.Label("Battery Storage Capacity (kWh):"),
dcc.Slider(
id='battery-slider',
min=0,
max=2000,
value=simulation.config['energy_sources']['battery_storage']['capacity'],
marks={i: f'{i}' for i in range(0, 2001, 500)},
step=100
),
html.Button('Run Simulation', id='run-button',
style={'backgroundColor': '#4CAF50', 'color': 'white',
'padding': '10px 20px', 'margin': '20px 0',
'border': 'none', 'borderRadius': '4px',
'cursor': 'pointer'}),
], style={'width': '30%', 'display': 'inline-block', 'padding': '20px',
'backgroundColor': '#f8f9fa', 'borderRadius': '10px',
'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'}),
html.Div([
html.H3("Simulation Results", style={'color': '#2a3f5f'}),
dcc.Tabs([
dcc.Tab(label="Energy Overview", children=[
dcc.Graph(id='energy-overview-graph')
]),
dcc.Tab(label="Building Loads", children=[
html.Label("Select Building:"),
dcc.Dropdown(
id='building-dropdown',
options=[{'label': name, 'value': name} for name in building_names],
value=building_names[0]
),
dcc.Graph(id='building-load-graph')
]),
dcc.Tab(label="Battery State", children=[
dcc.Graph(id='battery-graph')
]),
dcc.Tab(label="Cost & Emissions", children=[
dcc.Graph(id='cost-emissions-graph')
])
])
], style={'width': '65%', 'display': 'inline-block', 'float': 'right',
'padding': '20px', 'backgroundColor': '#f8f9fa',
'borderRadius': '10px', 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'})
]),
html.Div([
html.H3("Summary Statistics", style={'color': '#2a3f5f', 'marginTop': '30px'}),
html.Div(id='summary-stats', style={'padding': '15px', 'backgroundColor': '#e9ecef',
'borderRadius': '5px', 'marginTop': '10px'})
], style={'marginTop': '30px', 'clear': 'both', 'padding': '20px',
'backgroundColor': '#f8f9fa', 'borderRadius': '10px',
'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'})
], style={'maxWidth': '1200px', 'margin': '0 auto', 'padding': '20px'})
@app.callback(
[Output('energy-overview-graph', 'figure'),
Output('building-load-graph', 'figure'),
Output('battery-graph', 'figure'),
Output('cost-emissions-graph', 'figure'),
Output('summary-stats', 'children')],
[Input('run-button', 'n_clicks'),
Input('building-dropdown', 'value')],
[State('duration-slider', 'value'),
State('solar-slider', 'value'),
State('wind-slider', 'value'),
State('turbines-slider', 'value'),
State('battery-slider', 'value')]
)
def update_graphs(n_clicks, selected_building, duration, solar_capacity,
wind_capacity, num_turbines, battery_capacity):
nonlocal results_df
# If run button was clicked, update simulation parameters and rerun
if n_clicks is not None:
scenario_config = {
'simulation': {
'duration': duration
},
'energy_sources': {
'solar': {
'capacity': solar_capacity
},
'wind': {
'capacity': wind_capacity,
'turbines': num_turbines
},
'battery_storage': {
'capacity': battery_capacity
}
}
}
results_df = simulation.run_scenario(scenario_config)
# Energy Overview Graph
energy_fig = go.Figure()
energy_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['total_load'],
name='Total Load', line=dict(color='red', width=2)
))
energy_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['solar_generation'],
name='Solar Generation', line=dict(color='orange', width=2)
))
energy_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['wind_generation'],
name='Wind Generation', line=dict(color='blue', width=2)
))
energy_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['grid_consumption'],
name='Grid Consumption', line=dict(color='purple', width=2)
))
energy_fig.update_layout(
title='Energy Overview',
xaxis_title='Time',
yaxis_title='Power (kW)',
legend=dict(orientation='h', y=1.1),
height=400
)
# Building Load Graph
building_fig = go.Figure()
building_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df[f'load_{selected_building}'],
name=f'{selected_building} Load', line=dict(color='green', width=2)
))
building_fig.update_layout(
title=f'{selected_building} Energy Consumption',
xaxis_title='Time',
yaxis_title='Power (kW)',
height=400
)
# Battery Graph
battery_fig = go.Figure()
battery_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['battery_soc'],
name='Battery State of Charge', line=dict(color='blue', width=2)
))
battery_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['battery_charge'],
name='Battery Charging', line=dict(color='green', width=2)
))
battery_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['battery_discharge'],
name='Battery Discharging', line=dict(color='red', width=2)
))
battery_fig.update_layout(
title='Battery Operation',
xaxis_title='Time',
yaxis_title='Power (kW) / Energy (kWh)',
legend=dict(orientation='h', y=1.1),
height=400
)
# Cost & Emissions Graph
cost_emissions_fig = go.Figure()
cost_emissions_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['total_cost'],
name='Energy Cost', line=dict(color='green', width=2),
yaxis='y'
))
cost_emissions_fig.add_trace(go.Scatter(
x=results_df['timestamp'], y=results_df['co2_emissions'],
name='CO2 Emissions', line=dict(color='gray', width=2),
yaxis='y2'
))
cost_emissions_fig.update_layout(
title='Cost and Emissions',
xaxis_title='Time',
yaxis=dict(title='Cost ($)', side='left', showgrid=False),
yaxis2=dict(title='CO2 Emissions (kg)', side='right', overlaying='y', showgrid=False),
legend=dict(orientation='h', y=1.1),
height=400
)
# Summary Statistics
total_energy_demand = results_df['total_load'].sum() * simulation.time_step.total_seconds() / 3600 # kWh
total_solar = results_df['solar_generation'].sum() * simulation.time_step.total_seconds() / 3600 # kWh
total_wind = results_df['wind_generation'].sum() * simulation.time_step.total_seconds() / 3600 # kWh
total_grid = results_df['grid_consumption'].sum() * simulation.time_step.total_seconds() / 3600 # kWh
total_cost = results_df['total_cost'].sum()
total_emissions = results_df['co2_emissions'].sum()
renewable_percentage = ((total_solar + total_wind) / total_energy_demand) * 100 if total_energy_demand > 0 else 0
summary_stats = html.Div([
html.Div([
html.Div([
html.H4("Energy Demand"),
html.P(f"{total_energy_demand:.2f} kWh")
], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
html.Div([
html.H4("Renewable Energy"),
html.P(f"{total_solar + total_wind:.2f} kWh ({renewable_percentage:.1f}%)")
], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
html.Div([
html.H4("Grid Energy"),
html.P(f"{total_grid:.2f} kWh")
], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
html.Div([
html.H4("Total Cost"),
html.P(f"${total_cost:.2f}")
], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
html.Div([
html.H4("CO2 Emissions"),
html.P(f"{total_emissions:.2f} kg")
], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'})
])
])
return energy_fig, building_fig, battery_fig, cost_emissions_fig, summary_stats
return app
if __name__ == "__main__":
# Create simulation
simulation = SmartGridSimulation()
# Create and run dashboard
app = create_dashboard(simulation)
app.run_server(debug=True)
Resources:
5. Augmented Reality Campus Navigation System

Difficulty Level: Advanced
Skills Developed: Computer Vision, AR Development, Geospatial Programming, Mobile Integration
Project Description: Build an augmented reality application that helps students navigate their college campus. This system overlays directional information, points of interest, and real-time data (like crowdedness of study spaces) onto a live camera view, creating an intuitive navigation experience.
Key Features:
- AR-based navigation with directional arrows
- Building and room identification
- Shortest path algorithms
- Points of interest with information overlays
- Real-time updates on available facilities
- Indoor positioning system
Implementation Steps:
- Set up the AR development environment (ARCore or ARKit integration)
- Create the campus map database with GPS coordinates
- Implement computer vision for landmark recognition
- Develop pathfinding algorithms
- Build the AR interface with information overlays
- Create a backend server for real-time data
Sample Code Implementation:
# campus_ar_navigation.py
from flask import Flask, request, jsonify
import numpy as np
import cv2
import json
import networkx as nx
import math
import sqlite3
import datetime
import os
from geopy.distance import geodesic
app = Flask(__name__)
# Database setup
def init_db():
conn = sqlite3.connect('campus_navigation.db')
cursor = conn.cursor()
# Create buildings table
cursor.execute('''
CREATE TABLE IF NOT EXISTS buildings (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
image_path TEXT,
building_type TEXT,
floors INTEGER DEFAULT 1
)
''')
# Create rooms table
cursor.execute('''
CREATE TABLE IF NOT EXISTS rooms (
id INTEGER PRIMARY KEY,
building_id INTEGER,
room_number TEXT NOT NULL,
name TEXT,
floor INTEGER,
latitude REAL,
longitude REAL,
capacity INTEGER,
room_type TEXT,
FOREIGN KEY (building_id) REFERENCES buildings (id)
)
''')
# Create points of interest table
cursor.execute('''
CREATE TABLE IF NOT EXISTS points_of_interest (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
poi_type TEXT,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
altitude REAL,
building_id INTEGER,
image_path TEXT,
FOREIGN KEY (building_id) REFERENCES buildings (id)
)
''')
# Create paths table for navigation
cursor.execute('''
CREATE TABLE IF NOT EXISTS paths (
id INTEGER PRIMARY KEY,
start_point_id INTEGER NOT NULL,
end_point_id INTEGER NOT NULL,
path_type TEXT,
distance REAL,
indoor BOOLEAN,
accessible BOOLEAN DEFAULT 1
)
''')
# Create facility status table
cursor.execute('''
CREATE TABLE IF NOT EXISTS facility_status (
id INTEGER PRIMARY KEY,
facility_id INTEGER NOT NULL,
status TEXT,
occupancy_count INTEGER,
last_updated TIMESTAMP,
FOREIGN KEY (facility_id) REFERENCES points_of_interest (id)
)
''')
conn.commit()
conn.close()
# Initialize database
init_db()
# Load sample data for testing
def load_sample_data():
conn = sqlite3.connect('campus_navigation.db')
cursor = conn.cursor()
# Check if data already exists
cursor.execute("SELECT COUNT(*) FROM buildings")
if cursor.fetchone()[0] > 0:
conn.close()
return
# Sample buildings
buildings = [
(1, "Main Academic Building", "Central academic building with classrooms and offices", 40.7128, -74.0060, 10, "main_academic.jpg", "academic", 4),
(2, "Science Center", "Houses labs and research facilities", 40.7130, -74.0065, 8, "science_center.jpg", "academic", 3),
(3, "Student Union", "Student services and recreation", 40.7125, -74.0055, 5, "student_union.jpg", "services", 2),
(4, "Library", "Main campus library", 40.7127, -74.0050, 12, "library.jpg", "academic", 5),
(5, "Dormitory A", "Undergraduate housing", 40.7135, -74.0070, 15, "dorm_a.jpg", "residential", 8),
(6, "Sports Complex", "Athletic facilities and gym", 40.7120, -74.0080, 3, "sports.jpg", "recreational", 2)
]
cursor.executemany('''
INSERT INTO buildings (id, name, description, latitude, longitude, altitude, image_path, building_type, floors)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', buildings)
# Sample rooms
rooms = [
(1, 1, "101", "Lecture Hall", 1, 40.7128, -74.0060, 120, "classroom"),
(2, 1, "201", "Computer Lab", 2, 40.7128, -74.0060, 30, "lab"),
(3, 2, "105", "Chemistry Lab", 1, 40.7130, -74.0065, 25, "lab"),
(4, 3, "150", "Cafeteria", 1, 40.7125, -74.0055, 200, "dining"),
(5, 4, "220", "Study Area", 2, 40.7127, -74.0050, 50, "study"),
(6, 4, "320", "Quiet Zone", 3, 40.7127, -74.0050, 30, "study")
]
cursor.executemany('''
INSERT INTO rooms (id, building_id, room_number, name, floor, latitude, longitude, capacity, room_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', rooms)
# Sample points of interest
pois = [
(1, "Main Entrance", "Primary entrance to campus", "entrance", 40.7133, -74.0062, 2, None, "main_entrance.jpg"),
(2, "Coffee Shop", "Campus coffee shop", "food", 40.7126, -74.0059, 5, 3, "coffee_shop.jpg"),
(3, "Campus Shuttle Stop", "Shuttle service stop", "transportation", 40.7129, -74.0070, 2, None, "shuttle_stop.jpg"),
(4, "Info Kiosk", "Information booth", "information", 40.7130, -74.0061, 3, None, "info_kiosk.jpg"),
(5, "ATM", "Campus ATM", "service", 40.7125, -74.0058, 4, 3, "atm.jpg"),
(6, "Bicycle Rack", "Bicycle parking", "transportation", 40.7127, -74.0063, 2, None, "bike_rack.jpg")
]
cursor.executemany('''
INSERT INTO points_of_interest (id, name, description, poi_type, latitude, longitude, altitude, building_id, image_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', pois)
# Sample paths
paths = [
(1, 1, 2, "walkway", 100.5, False, True),
(2, 1, 3, "walkway", 150.2, False, True),
(3, 2, 3, "walkway", 120.8, False, True),
(4, 2, 4, "walkway", 200.3, False, True),
(5, 3, 4, "walkway", 180.1, False, True),
(6, 3, 5, "walkway", 250.7, False, True),
(7, 4, 5, "walkway", 220.4, False, True),
(8, 5, 6, "walkway", 300.9, False, True)
]
cursor.executemany('''
INSERT INTO paths (id, start_point_id, end_point_id, path_type, distance, indoor, accessible)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', paths)
# Sample facility status
facility_statuses = [
(1, 2, "Open", 15, datetime.datetime.now()),
(2, 3, "Active", 0, datetime.datetime.now()),
(3, 5, "Available", 0, datetime.datetime.now()),
(4, 6, "Available", 12, datetime.datetime.now())
]
cursor.executemany('''
INSERT INTO facility_status (id, facility_id, status, occupancy_count, last_updated)
VALUES (?, ?, ?, ?, ?)
''', facility_statuses)
conn.commit()
conn.close()
# Load sample data
load_sample_data()
# Computer Vision Functions
class LandmarkRecognizer:
def __init__(self, model_path='landmark_model.h5', landmarks_db='landmarks.json'):
# In a real implementation, load a trained model
# For this example, we'll simulate recognition with a simplified approach
self.landmarks = self._load_landmarks(landmarks_db)
def _load_landmarks(self, landmarks_db):
# Load landmark data from JSON file or use hardcoded data for example
if os.path.exists(landmarks_db):
with open(landmarks_db, 'r') as f:
return json.load(f)
else:
# Sample landmark data for demonstration
return {
"building_1": {
"features": [0.2, 0.3, 0.1, 0.5, 0.7], # Simplified feature representation
"building_id": 1
},
"building_2": {
"features": [0.8, 0.2, 0.6, 0.3, 0.1],
"building_id": 2
},
# Add more landmarks...
}
def recognize(self, image):
"""
Recognize landmarks in an image.
In a real implementation, this would use a trained model.
For this example, we'll return a random landmark with confidence.
"""
# Convert image to numpy array if it's not already
if isinstance(image, str):
image = cv2.imread(image)
# In a real implementation, process the image and extract features
# For this example, we'll simulate recognition
# Random landmark with confidence score
landmarks = list(self.landmarks.keys())
if not landmarks:
return None, 0.0
landmark_id = np.random.choice(landmarks)
confidence = np.random.uniform(0.5, 0.95)
return self.landmarks[landmark_id], confidence
# Navigation System
class CampusNavigator:
def __init__(self, db_path='campus_navigation.db'):
self.db_path = db_path
self.graph = self._build_navigation_graph()
def _build_navigation_graph(self):
"""Build a navigation graph from the paths database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create a graph
G = nx.Graph()
# Add nodes (buildings, POIs)
cursor.execute("SELECT id, latitude, longitude FROM buildings")
for building_id, lat, lon in cursor.fetchall():
G.add_node(f"b_{building_id}", pos=(lat, lon), type="building")
cursor.execute("SELECT id, latitude, longitude FROM points_of_interest")
for poi_id, lat, lon in cursor.fetchall():
G.add_node(f"p_{poi_id}", pos=(lat, lon), type="poi")
# Add edges (paths)
cursor.execute("""
SELECT p.id, p.start_point_id, p.end_point_id, p.distance, p.accessible
FROM paths p
""")
for path_id, start_id, end_id, distance, accessible in cursor.fetchall():
# Determine node types and convert IDs
start_type = "b" if start_id <= 6 else "p" # Simplified determination for demo
end_type = "b" if end_id <= 6 else "p"
start_node = f"{start_type}_{start_id}"
end_node = f"{end_type}_{end_id}"
if accessible:
G.add_edge(start_node, end_node, weight=distance, path_id=path_id)
conn.close()
return G
def find_shortest_path(self, start_coords, end_coords):
"""
Find the shortest path between two coordinates.
Parameters:
start_coords (tuple): (latitude, longitude) of starting point
end_coords (tuple): (latitude, longitude) of destination
Returns:
list: List of coordinates forming the path
"""
# Find nearest nodes to coordinates
start_node = self._find_nearest_node(start_coords)
end_node = self._find_nearest_node(end_coords)
if not start_node or not end_node:
return None
try:
# Find shortest path
path = nx.shortest_path(self.graph, source=start_node, target=end_node, weight='weight')
# Convert path to coordinates
path_coords = []
for node in path:
node_data = self.graph.nodes[node]
path_coords.append(node_data['pos'])
return path_coords
except nx.NetworkXNoPath:
return None
def _find_nearest_node(self, coords):
"""Find the nearest node to the given coordinates."""
min_dist = float('inf')
nearest_node = None
for node, data in self.graph.nodes(data=True):
node_coords = data.get('pos')
if node_coords:
dist = geodesic(coords, node_coords).meters
if dist < min_dist:
min_dist = dist
nearest_node = node
return nearest_node
def get_nearby_pois(self, coords, radius=200):
"""Get points of interest within a certain radius (meters)."""
nearby = []
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get all POIs
cursor.execute("""
SELECT id, name, description, poi_type, latitude, longitude
FROM points_of_interest
""")
for poi_id, name, desc, poi_type, lat, lon in cursor.fetchall():
poi_coords = (lat, lon)
dist = geodesic(coords, poi_coords).meters
if dist <= radius:
# Get current status
cursor.execute("""
SELECT status, occupancy_count, last_updated
FROM facility_status
WHERE facility_id = ?
""", (poi_id,))
status_row = cursor.fetchone()
status = {
"status": status_row[0] if status_row else "Unknown",
"occupancy": status_row[1] if status_row else None,
"last_updated": status_row[2] if status_row else None
} if status_row else None
nearby.append({
"id": poi_id,
"name": name,
"description": desc,
"type": poi_type,
"coordinates": poi_coords,
"distance": dist,
"status": status
})
conn.close()
return sorted(nearby, key=lambda x: x["distance"])
def get_building_info(self, building_id):
"""Get detailed information about a building."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT name, description, latitude, longitude, altitude, building_type, floors
FROM buildings
WHERE id = ?
""", (building_id,))
building_data = cursor.fetchone()
if not building_data:
conn.close()
return None
name, desc, lat, lon, alt, b_type, floors = building_data
# Get rooms in the building
cursor.execute("""
SELECT id, room_number, name, floor, room_type, capacity
FROM rooms
WHERE building_id = ?
""", (building_id,))
rooms = []
for room_id, room_num, room_name, floor, room_type, capacity in cursor.fetchall():
rooms.append({
"id": room_id,
"number": room_num,
"name": room_name,
"floor": floor,
"type": room_type,
"capacity": capacity
})
# Get POIs in the building
cursor.execute("""
SELECT id, name, description, poi_type
FROM points_of_interest
WHERE building_id = ?
""", (building_id,))
pois = []
for poi_id, poi_name, poi_desc, poi_type in cursor.fetchall():
pois.append({
"id": poi_id,
"name": poi_name,
"description": poi_desc,
"type": poi_type
})
conn.close()
return {
"id": building_id,
"name": name,
"description": desc,
"coordinates": (lat, lon, alt),
"type": b_type,
"floors": floors,
"rooms": rooms,
"points_of_interest": pois
}
# Flask API Endpoints
navigator = CampusNavigator()
landmark_recognizer = LandmarkRecognizer()
@app.route('/api/navigate', methods=['POST'])
def navigate():
data = request.json
if not data or 'start' not in data or 'destination' not in data:
return jsonify({"error": "Invalid request parameters"}), 400
try:
start_coords = (data['start']['latitude'], data['start']['longitude'])
dest_coords = (data['destination']['latitude'], data['destination']['longitude'])
path = navigator.find_shortest_path(start_coords, dest_coords)
if not path:
return jsonify({"error": "No path found"}), 404
# Format path for response
path_points = []
for i, (lat, lon) in enumerate(path):
point = {
"latitude": lat,
"longitude": lon,
"step": i + 1
}
path_points.append(point)
response = {
"path": path_points,
"distance": sum(geodesic(path[i], path[i+1]).meters for i in range(len(path)-1)),
"estimated_time": sum(geodesic(path[i], path[i+1]).meters for i in range(len(path)-1)) / 1.4 / 60 # Assuming 1.4 m/s walking speed
}
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/recognize', methods=['POST'])
def recognize_landmark():
if 'image' not in request.files:
return jsonify({"error": "No image provided"}), 400
image_file = request.files['image']
try:
# Read image
img_array = np.frombuffer(image_file.read(), np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
# Recognize landmark
landmark, confidence = landmark_recognizer.recognize(img)
if not landmark or confidence < 0.6: # Threshold
return jsonify({"recognized": False, "message": "No landmark recognized with confidence"})
# Get building information
building_id = landmark.get("building_id")
building_info = navigator.get_building_info(building_id)
if not building_info:
return jsonify({"recognized": True, "confidence": confidence, "message": "Building information not available"})
return jsonify({
"recognized": True,
"confidence": confidence,
"building": building_info
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/nearby', methods=['GET'])
def nearby_pois():
try:
lat = float(request.args.get('latitude'))
lon = float(request.args.get('longitude'))
radius = float(request.args.get('radius', 200)) # Default 200m
nearby = navigator.get_nearby_pois((lat, lon), radius)
return jsonify({"nearby_points": nearby})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/building/<int:building_id>', methods=['GET'])
def building_info(building_id):
try:
info = navigator.get_building_info(building_id)
if not info:
return jsonify({"error": "Building not found"}), 404
return jsonify({"building": info})
except Exception as e:
return jsonify({"error": str(e)}), 500
# Mobile AR Application Code (Android with ARCore - Java/Kotlin interface)
# This would normally be in a separate project, but here's a conceptual implementation
"""
import android.os.Bundle
import android.widget.Toast
import androidx.appcompat.app.AppCompatActivity
import com.google.ar.core.*
import com.google.ar.core.exceptions.*
import java.util.concurrent.TimeUnit
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import okhttp3.OkHttpClient
class ARNavigationActivity : AppCompatActivity() {
private lateinit var arSession: Session
private lateinit var apiService: CampusApiService
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_ar_navigation)
// Setup API service
val client = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.build()
val retrofit = Retrofit.Builder()
.baseUrl("http://your-server-url/api/")
.client(client)
.addConverterFactory(GsonConverterFactory.create())
.build()
apiService = retrofit.create(CampusApiService::class.java)
// Setup AR session
setupArSession()
}
private fun setupArSession() {
try {
if (ArCoreApk.getInstance().requestInstall(this, true) === ArCoreApk.InstallStatus.INSTALLED) {
arSession = Session(this)
val config = Config(arSession)
config.updateMode = Config.UpdateMode.LATEST_CAMERA_IMAGE
config.focusMode = Config.FocusMode.AUTO
// Configure geospatial features
if (Session.isGeospatialModeSupported(Session.GeoSpatialMode.ENABLED)) {
config.geospatialMode = Config.GeoSpatialMode.ENABLED
}
arSession.configure(config)
// Initialize AR renderer
// ...
}
} catch (e: UnavailableArcoreNotInstalledException) {
// ARCore not installed
Toast.makeText(this, "ARCore not installed", Toast.LENGTH_LONG).show()
} catch (e: Exception) {
// Other errors
Toast.makeText(this, "Error setting up AR: ${e.message}", Toast.LENGTH_LONG).show()
}
}
private fun startNavigation(destination: LatLng) {
// Get current location
val earth = arSession.earth
if (earth?.trackingState == TrackingState.TRACKING) {
val currentLocation = earth.cameraGeospatialPose
// Request navigation path
val navigationRequest = NavigationRequest(
start = LatLng(
currentLocation.latitude,
currentLocation.longitude
),
destination = destination
)
// Call API for navigation
apiService.navigate(navigationRequest).enqueue(object : Callback<NavigationResponse> {
override fun onResponse(call: Call<NavigationResponse>, response: Response<NavigationResponse>) {
if (response.isSuccessful) {
val path = response.body()
displayNavigationPath(path)
} else {
Toast.makeText(this@ARNavigationActivity, "Navigation failed", Toast.LENGTH_SHORT).show()
}
}
override fun onFailure(call: Call<NavigationResponse>, t: Throwable) {
Toast.makeText(this@ARNavigationActivity, "Network error: ${t.message}", Toast.LENGTH_SHORT).show()
}
})
} else {
Toast.makeText(this, "Earth tracking not available", Toast.LENGTH_SHORT).show()
}
}
private fun displayNavigationPath(path: NavigationResponse?) {
// Display AR navigation elements based on the returned path
// This would involve creating anchors, rendering directional arrows, etc.
// ...
}
private fun recognizeLandmark() {
// Capture current camera frame
val frame = arSession.update()
val image = frame.acquireCameraImage()
// Convert to byte array and send to server
// ...
// Call API for landmark recognition
apiService.recognizeLandmark(imageBytes).enqueue(object : Callback<RecognitionResponse> {
override fun onResponse(call: Call<RecognitionResponse>, response: Response<RecognitionResponse>) {
if (response.isSuccessful && response.body()?.recognized == true) {
displayBuildingInfo(response.body()?.building)
}
}
override fun onFailure(call: Call<RecognitionResponse>, t: Throwable) {
Toast.makeText(this@ARNavigationActivity, "Recognition failed: ${t.message}", Toast.LENGTH_SHORT).show()
}
})
}
private fun displayBuildingInfo(building: Building?) {
// Display building information overlay
// ...
}
override fun onResume() {
super.onResume()
arSession.resume()
}
override fun onPause() {
super.onPause()
arSession.pause()
}
}
"""
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
Resources:
6. Sentiment-Based Course Feedback Analysis System

Difficulty Level: Intermediate
Skills Developed: Natural Language Processing, Sentiment Analysis, Data Visualization, Web Development
Project Description: Develop a system that analyzes student feedback for courses using sentiment analysis and natural language processing. This tool helps educational institutions understand student satisfaction, identify areas for improvement, and track changes over time.
Key Features:
- Text preprocessing and cleaning
- Sentiment analysis of feedback comments
- Topic modeling to identify common themes
- Trend analysis across semesters
- Interactive visualization dashboard
- Recommendation engine for course improvements
Implementation Steps:
- Collect and preprocess sample course feedback data
- Implement sentiment analysis models using NLTK or spaCy
- Develop topic modeling using Latent Dirichlet Allocation
- Create data visualization components
- Build a web interface using Django or Flask
- Implement the recommendation engine
Sample Code Implementation:
pythonCopy# feedback_analysis.py
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.sentiment import SentimentIntensityAnalyzer
import gensim
from gensim import corpora
from gensim.models import LdaModel
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import NMF
import matplotlib.pyplot as plt
import seaborn as sns
from flask import Flask, render_template, request, jsonify
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
import os
from datetime import datetime
from wordcloud import WordCloud
# Download necessary NLTK resources
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('vader_lexicon')
app = Flask(__name__)
class FeedbackAnalyzer:
def __init__(self):
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
self.sia = SentimentIntensityAnalyzer()
# Add domain-specific stop words
self.stop_words.update(['course', 'class', 'professor', 'instructor', 'student', 'lecture'])
def load_data(self, file_path):
"""Load feedback data from CSV file."""
if file_path.endswith('.csv'):
return pd.read_csv(file_path)
elif file_path.endswith('.xlsx'):
return pd.read_excel(file_path)
else:
raise ValueError("Unsupported file format. Please use CSV or Excel files.")
def preprocess_text(self, text):
"""Clean and preprocess text data."""
if not isinstance(text, str):
return ""
# Convert to lowercase
text = text.lower()
# Remove special characters and numbers
text = re.sub(r'[^a-zA-Z\s]', '', text)
# Tokenize
tokens = word_tokenize(text)
# Remove stopwords and lemmatize
processed_tokens = [self.lemmatizer.lemmatize(token) for token in tokens
if token not in self.stop_words and len(token) > 2]
return ' '.join(processed_tokens)
def analyze_sentiment(self, text):
"""Analyze sentiment of text using VADER."""
if not isinstance(text, str) or not text.strip():
return {'compound': 0, 'pos': 0, 'neu': 0, 'neg': 0}
return self.sia.polarity_scores(text)
def classify_sentiment(self, compound_score):
"""Classify sentiment based on compound score."""
if compound_score >= 0.05:
return 'positive'
elif compound_score <= -0.05:
return 'negative'
else:
return 'neutral'
def extract_topics(self, texts, num_topics=5, method='lda'):
"""Extract topics from a collection of texts."""
if not texts or all(not isinstance(text, str) or not text.strip() for text in texts):
return [], []
# Filter out empty texts
texts = [text for text in texts if isinstance(text, str) and text.strip()]
if method == 'lda':
# Tokenize texts
tokenized_texts = [text.split() for text in texts]
# Create dictionary and corpus
dictionary = corpora.Dictionary(tokenized_texts)
corpus = [dictionary.doc2bow(text) for text in tokenized_texts]
# Train LDA model
lda_model = LdaModel(
corpus=corpus,
id2word=dictionary,
num_topics=num_topics,
passes=15,
alpha='auto',
random_state=42
)
# Extract topics
topics = lda_model.print_topics(num_words=10)
# Map documents to topics
doc_topics = []
for doc in corpus:
topic_probs = lda_model.get_document_topics(doc)
doc_topics.append(max(topic_probs, key=lambda x: x[1])[0] if topic_probs else -1)
return topics, doc_topics
elif method == 'nmf':
# Use Non-negative Matrix Factorization for topic modeling
vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
tfidf = vectorizer.fit_transform(texts)
# Train NMF model
nmf_model = NMF(n_components=num_topics, random_state=42)
nmf_model.fit(tfidf)
# Extract topics
feature_names = vectorizer.get_feature_names_out()
topics = []
for topic_idx, topic in enumerate(nmf_model.components_):
top_features_idx = topic.argsort()[:-11:-1]
top_features = [feature_names[i] for i in top_features_idx]
topic_str = ' '.join(top_features)
topics.append((topic_idx, topic_str))
# Map documents to topics
doc_topic_matrix = nmf_model.transform(tfidf)
doc_topics = doc_topic_matrix.argmax(axis=1).tolist()
return topics, doc_topics
else:
raise ValueError("Unsupported topic extraction method. Use 'lda' or 'nmf'.")
def analyze_feedback(self, data, comment_column, course_column=None, instructor_column=None,
semester_column=None, rating_column=None, student_id_column=None):
"""
Analyze feedback data and return results.
Parameters:
- data: DataFrame with feedback data
- comment_column: Column name containing feedback text
- course_column: Column name for course identification
- instructor_column: Column name for instructor identification
- semester_column: Column name for semester/term identification
- rating_column: Column name for numerical ratings
- student_id_column: Column name for student identification
Returns:
- DataFrame with analysis results
"""
# Copy data to avoid modifying original
df = data.copy()
# Ensure comment column exists
if comment_column not in df.columns:
raise ValueError(f"Comment column '{comment_column}' not found in data.")
# Preprocess comments
df['processed_comment'] = df[comment_column].apply(self.preprocess_text)
# Analyze sentiment
sentiment_scores = df[comment_column].apply(self.analyze_sentiment)
df['sentiment_compound'] = sentiment_scores.apply(lambda x: x['compound'])
df['sentiment_positive'] = sentiment_scores.apply(lambda x: x['pos'])
df['sentiment_neutral'] = sentiment_scores.apply(lambda x: x['neu'])
df['sentiment_negative'] = sentiment_scores.apply(lambda x: x['neg'])
df['sentiment_category'] = df['sentiment_compound'].apply(self.classify_sentiment)
# Extract topics if there are enough valid comments
valid_comments = df['processed_comment'].dropna().tolist()
if len(valid_comments) >= 5: # Minimum number for meaningful topic extraction
topics, doc_topics = self.extract_topics(valid_comments)
# Map topics back to dataframe
topic_map = {i: idx for i, idx in enumerate(df.index) if isinstance(df.loc[idx, 'processed_comment'], str)
and df.loc[idx, 'processed_comment'].strip()}
topic_assignments = {}
for i, topic_idx in enumerate(doc_topics):
if i in topic_map:
topic_assignments[topic_map[i]] = topic_idx
df['topic'] = df.index.map(lambda x: topic_assignments.get(x, -1))
# Create topic descriptions
topic_descriptions = {}
for topic_idx, topic_terms in topics:
# Extract main terms from topic, removing numbers and formatting
terms = re.findall(r'"([^"]*)"', topic_terms)
if terms:
topic_descriptions[topic_idx] = ', '.join(terms[:5]) # Top 5 terms
else:
topic_descriptions[topic_idx] = f"Topic {topic_idx+1}"
df['topic_description'] = df['topic'].map(lambda x: topic_descriptions.get(x, "No Topic") if x != -1 else "No Topic")
else:
df['topic'] = -1
df['topic_description'] = "Insufficient data for topic modeling"
# Group analysis by course, instructor, or semester if specified
results = {'overall': df}
if course_column and course_column in df.columns:
results['by_course'] = df.groupby(course_column).agg({
'sentiment_compound': 'mean',
'sentiment_positive': 'mean',
'sentiment_neutral': 'mean',
'sentiment_negative': 'mean',
comment_column: 'count'
}).reset_index()
if instructor_column and instructor_column in df.columns:
results['by_instructor'] = df.groupby(instructor_column).agg({
'sentiment_compound': 'mean',
'sentiment_positive': 'mean',
'sentiment_neutral': 'mean',
'sentiment_negative': 'mean',
comment_column: 'count'
}).reset_index()
if semester_column and semester_column in df.columns:
results['by_semester'] = df.groupby(semester_column).agg({
'sentiment_compound': 'mean',
'sentiment_positive': 'mean',
'sentiment_neutral': 'mean',
'sentiment_negative': 'mean',
comment_column: 'count'
}).reset_index()
if rating_column and rating_column in df.columns:
# Calculate correlation between ratings and sentiment
results['rating_correlation'] = df[[rating_column, 'sentiment_compound']].corr().iloc[0, 1]
# Group by rating
results['by_rating'] = df.groupby(rating_column).agg({
'sentiment_compound': 'mean',
comment_column: 'count'
}).reset_index()
return results
def generate_recommendations(self, analysis_results):
"""Generate recommendations based on analysis results."""
recommendations = []
# Get overall data
df = analysis_results['overall']
# Find most negative comments
negative_comments = df[df['sentiment_category'] == 'negative'].sort_values('sentiment_compound')
if not negative_comments.empty:
# Analyze most common topics in negative feedback
topic_counts = negative_comments['topic_description'].value_counts()
if not topic_counts.empty:
# Recommend addressing top negative topics
for topic, count in topic_counts.head(3).items():
if topic != "No Topic":
recommendations.append(f"Address concerns related to '{topic}' which appears in {count} negative comments.")
# Check course trends if available
if 'by_course' in analysis_results:
course_data = analysis_results['by_course']
# Find courses with lowest sentiment
low_courses = course_data.sort_values('sentiment_compound').head(3)
for _, row in low_courses.iterrows():
recommendations.append(f"Review course '{row.iloc[0]}' which has a low sentiment score of {row['sentiment_compound']:.2f}.")
# Check instructor trends if available
if 'by_instructor' in analysis_results:
instructor_data = analysis_results['by_instructor']
# Find instructors with lowest sentiment
low_instructors = instructor_data.sort_values('sentiment_compound').head(3)
for _, row in low_instructors.iterrows():
recommendations.append(f"Provide support to instructor '{row.iloc[0]}' whose feedback has a sentiment score of {row['sentiment_compound']:.2f}.")
# Check semester trends if available
if 'by_semester' in analysis_results:
semester_data = analysis_results['by_semester'].sort_values('sentiment_compound')
# Check if sentiment is declining
if len(semester_data) > 1 and semester_data.iloc[0]['sentiment_compound'] < semester_data.iloc[-1]['sentiment_compound']:
recommendations.append("Sentiment appears to be declining over time. Consider a comprehensive review of curriculum and teaching methods.")
# General recommendations if specific ones couldn't be generated
if not recommendations:
recommendations = [
"Collect more detailed feedback to enable better analysis.",
"Consider adding structured questions to complement open-ended feedback.",
"Implement regular check-ins during the semester rather than end-of-term feedback only."
]
return recommendations
def generate_visualizations(self, analysis_results):
"""Generate visualization data for dashboard."""
visualizations = {}
# Overall sentiment distribution
sentiment_counts = analysis_results['overall']['sentiment_category'].value_counts().reset_index()
sentiment_counts.columns = ['category', 'count']
visualizations['sentiment_distribution'] = {
'data': sentiment_counts.to_dict('records'),
'layout': {
'title': 'Overall Sentiment Distribution',
'xaxis': {'title': 'Sentiment Category'},
'yaxis': {'title': 'Count'}
}
}
# Topic distribution
topic_counts = analysis_results['overall']['topic_description'].value_counts().reset_index()
topic_counts.columns = ['topic', 'count']
topic_counts = topic_counts[topic_counts['topic'] != 'No Topic'] # Filter out no topic
if not topic_counts.empty:
visualizations['topic_distribution'] = {
'data': topic_counts.to_dict('records'),
'layout': {
'title': 'Topic Distribution in Feedback',
'xaxis': {'title': 'Topic'},
'yaxis': {'title': 'Count'}
}
}
# Sentiment by topic
if 'topic_description' in analysis_results['overall'].columns:
topic_sentiment = analysis_results['overall'].groupby('topic_description').agg({
'sentiment_compound': 'mean',
'sentiment_compound': 'count'
}).reset_index()
topic_sentiment.columns = ['topic', 'avg_sentiment', 'count']
topic_sentiment = topic_sentiment[topic_sentiment['topic'] != 'No Topic']
if not topic_sentiment.empty:
visualizations['sentiment_by_topic'] = {
'data': topic_sentiment.to_dict('records'),
'layout': {
'title': 'Average Sentiment by Topic',
'xaxis': {'title': 'Topic'},
'yaxis': {'title': 'Average Sentiment Score'}
}
}
# Trend analysis if semester data available
if 'by_semester' in analysis_results:
semester_data = analysis_results['by_semester']
visualizations['sentiment_trend'] = {
'data': semester_data.to_dict('records'),
'layout': {
'title': 'Sentiment Trend Over Time',
'xaxis': {'title': 'Semester'},
'yaxis': {'title': 'Average Sentiment Score'}
}
}
# Course comparison if course data available
if 'by_course' in analysis_results:
course_data = analysis_results['by_course']
visualizations['course_comparison'] = {
'data': course_data.to_dict('records'),
'layout': {
'title': 'Sentiment Comparison by Course',
'xaxis': {'title': 'Course'},
'yaxis': {'title': 'Average Sentiment Score'}
}
}
return visualizations
# Flask web application routes
analyzer = FeedbackAnalyzer()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'})
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'})
# Save file temporarily
filename = f"temp_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"
file_path = os.path.join('uploads', filename)
os.makedirs('uploads', exist_ok=True)
file.save(file_path)
# Get column mappings from form
comment_column = request.form.get('comment_column')
course_column = request.form.get('course_column')
instructor_column = request.form.get('instructor_column')
semester_column = request.form.get('semester_column')
rating_column = request.form.get('rating_column')
try:
# Load data
data = analyzer.load_data(file_path)
# Preview data for column selection if not provided
if not comment_column:
columns = data.columns.tolist()
return jsonify({
'status': 'column_selection',
'columns': columns,
'filename': filename
})
# Analyze feedback
analysis_results = analyzer.analyze_feedback(
data,
comment_column,
course_column,
instructor_column,
semester_column,
rating_column
)
# Generate recommendations
recommendations = analyzer.generate_recommendations(analysis_results)
# Generate visualizations
visualizations = analyzer.generate_visualizations(analysis_results)
# Prepare sample comments
positive_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'positive']
negative_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'negative']
sample_positive = positive_comments.sort_values('sentiment_compound', ascending=False).head(5)[comment_column].tolist()
sample_negative = negative_comments.sort_values('sentiment_compound').head(5)[comment_column].tolist()
# Prepare overall stats
stats = {
'total_comments': len(analysis_results['overall']),
'positive_pct': (analysis_results['overall']['sentiment_category'] == 'positive').mean() * 100,
'neutral_pct': (analysis_results['overall']['sentiment_category'] == 'neutral').mean() * 100,
'negative_pct': (analysis_results['overall']['sentiment_category'] == 'negative').mean() * 100,
'avg_sentiment': analysis_results['overall']['sentiment_compound'].mean()
}
# Clean up temporary file
os.remove(file_path)
return jsonify({
'status': 'success',
'stats': stats,
'recommendations': recommendations,
'visualizations': visualizations,
'sample_positive': sample_positive,
'sample_negative': sample_negative
})
except Exception as e:
# Clean up temporary file
if os.path.exists(file_path):
os.remove(file_path)
return jsonify({'error': str(e)})
@app.route('/analyze', methods=['POST'])
def analyze():
filename = request.form.get('filename')
comment_column = request.form.get('comment_column')
course_column = request.form.get('course_column')
instructor_column = request.form.get('instructor_column')
semester_column = request.form.get('semester_column')
rating_column = request.form.get('rating_column')
file_path = os.path.join('uploads', filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found. Please upload again.'})
try:
# Load data
data = analyzer.load_data(file_path)
# Analyze feedback
analysis_results = analyzer.analyze_feedback(
data,
comment_column,
course_column,
instructor_column,
semester_column,
rating_column
)
# Generate recommendations
recommendations = analyzer.generate_recommendations(analysis_results)
# Generate visualizations
visualizations = analyzer.generate_visualizations(analysis_results)
# Prepare sample comments
positive_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'positive']
negative_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'negative']
sample_positive = positive_comments.sort_values('sentiment_compound', ascending=False).head(5)[comment_column].tolist()
sample_negative = negative_comments.sort_values('sentiment_compound').head(5)[comment_column].tolist()
# Prepare overall stats
stats = {
'total_comments': len(analysis_results['overall']),
'positive_pct': (analysis_results['overall']['sentiment_category'] == 'positive').mean() * 100,
'neutral_pct': (analysis_results['overall']['sentiment_category'] == 'neutral').mean() * 100,
'negative_pct': (analysis_results['overall']['sentiment_category'] == 'negative').mean() * 100,
'avg_sentiment': analysis_results['overall']['sentiment_compound'].mean()
}
# Clean up temporary file
os.remove(file_path)
return jsonify({
'status': 'success',
'stats': stats,
'recommendations': recommendations,
'visualizations': visualizations,
'sample_positive': sample_positive,
'sample_negative': sample_negative
})
except Exception as e:
# Clean up temporary file
if os.path.exists(file_path):
os.remove(file_path)
return jsonify({'error': str(e)})
if __name__ == '__main__':
app.run(debug=True)
Resources:
7. Automated Plagiarism Detection System with Code Analysis

Difficulty Level: Intermediate to Advanced
Skills Developed: Text Analysis, Algorithm Design, Code Parsing, Machine Learning
Project Description: Create a comprehensive plagiarism detection system specifically designed for programming assignments. Unlike simple text-matching tools, this system understands code structure, can detect logic similarities even when variable names change, and provides detailed reports on potential plagiarism instances.
Key Features:
- Code tokenization and normalization
- Algorithm similarity detection
- Natural language explanation comparison
- Source code repository integration
- Detailed similarity reports
- Learning capability to improve detection over time
Implementation Steps:
- Research code similarity algorithms
- Implement code parsing and tokenization
- Develop similarity detection algorithms
- Create a database of code submissions
- Build a web interface for submission and reporting
- Implement machine learning for improved detection
Sample Code Implementation:
pythonCopy# plagiarism_detector.py
import os
import re
import ast
import difflib
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns
from flask import Flask, render_template, request, jsonify
import sqlite3
import hashlib
import datetime
import json
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
import requests
from github import Github
import tokenize
from io import BytesIO
app = Flask(__name__)
class CodeProcessor:
"""Process code for plagiarism detection."""
def __init__(self):
self.python_lexer = PythonLexer()
self.html_formatter = HtmlFormatter()
def normalize_python_code(self, code):
"""
Normalize Python code by:
1. Removing comments
2. Standardizing variable names
3. Removing whitespace
4. Normalizing function names
"""
try:
# Parse the code into an AST
tree = ast.parse(code)
# Create a visitor to normalize variables and function names
normalizer = ASTNormalizer()
normalized_tree = normalizer.visit(tree)
# Convert back to code
normalized_code = ast.unparse(normalized_tree)
# Remove comments and whitespace
normalized_code = re.sub(r'#.*$', '', normalized_code, flags=re.MULTILINE)
normalized_code = re.sub(r'\s+', ' ', normalized_code)
return normalized_code
except SyntaxError:
# If code can't be parsed, do basic normalization
return self._basic_code_normalization(code)
def _basic_code_normalization(self, code):
"""Basic normalization for when AST parsing fails."""
# Remove comments
code = re.sub(r'#.*$', '', code, flags=re.MULTILINE)
# Remove docstrings
code = re.sub(r'""".*?"""', '', code, flags=re.DOTALL)
code = re.sub(r"'''.*?'''", '', code, flags=re.DOTALL)
# Normalize whitespace
code = re.sub(r'\s+', ' ', code)
# Try to normalize variable names
# This is a simplified approach - AST parsing would be more robust
words = set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code))
replacement_map = {}
var_counter = 0
func_counter = 0
for word in words:
# Skip Python keywords and built-ins
if word in {'if', 'else', 'for', 'while', 'def', 'class', 'return',
'import', 'from', 'as', 'try', 'except', 'finally',
'print', 'True', 'False', 'None', 'and', 'or', 'not',
'in', 'is', 'with', 'range', 'len', 'int', 'str', 'float',
'list', 'dict', 'set', 'tuple'}:
continue
# Check if it's likely a function name (followed by parentheses)
if re.search(r'\b' + re.escape(word) + r'\s*\(', code):
replacement = f'func_{func_counter}'
func_counter += 1
else:
replacement = f'var_{var_counter}'
var_counter += 1
replacement_map[word] = replacement
# Replace identifiers
normalized_code = code
for original, replacement in replacement_map.items():
normalized_code = re.sub(r'\b' + re.escape(original) + r'\b', replacement, normalized_code)
return normalized_code
def tokenize_code(self, code):
"""Tokenize code into a list of meaningful tokens."""
try:
tokens = []
# Create a BytesIO object from the string content
code_bytes = BytesIO(code.encode('utf-8'))
# Tokenize the code
for tok in tokenize.tokenize(code_bytes.readline):
# Skip comments, whitespace and encoding tokens
if tok.type not in {tokenize.COMMENT, tokenize.NL, tokenize.NEWLINE, tokenize.INDENT,
tokenize.DEDENT, tokenize.ENCODING}:
tokens.append((tokenize.tok_name[tok.type], tok.string))
return tokens
except tokenize.TokenError:
# Fall back to simple splitting if tokenize fails
return [('WORD', word) for word in re.findall(r'\b\w+\b', code)]
def extract_code_structure(self, code):
"""Extract the structural elements of the code."""
try:
tree = ast.parse(code)
# Extract information about classes, functions, and control structures
structure_visitor = StructureVisitor()
structure_visitor.visit(tree)
return structure_visitor.structure
except SyntaxError:
# If AST parsing fails, fall back to regex-based extraction
structure = {
'classes': len(re.findall(r'\bclass\s+\w+', code)),
'functions': len(re.findall(r'\bdef\s+\w+', code)),
'if_statements': len(re.findall(r'\bif\s+', code)),
'for_loops': len(re.findall(r'\bfor\s+', code)),
'while_loops': len(re.findall(r'\bwhile\s+', code))
}
return structure
def highlight_code(self, code):
"""Highlight code for HTML display."""
return highlight(code, self.python_lexer, self.html_formatter)
def generate_code_fingerprint(self, code):
"""Generate a fingerprint for code that captures its essence."""
# Normalize code
normalized_code = self.normalize_python_code(code)
# Extract structure
structure = self.extract_code_structure(code)
# Create a fingerprint dictionary
fingerprint = {
'code_hash': hashlib.md5(normalized_code.encode()).hexdigest(),
'structure': structure,
'token_count': len(self.tokenize_code(code)),
'normalized_code': normalized_code
}
return fingerprint
class ASTNormalizer(ast.NodeTransformer):
"""AST visitor to normalize variable and function names."""
def __init__(self):
self.var_map = {}
self.func_map = {}
self.class_map = {}
self.var_counter = 0
self.func_counter = 0
self.class_counter = 0
def visit_Name(self, node):
if isinstance(node.ctx, ast.Store):
# This is a variable definition
if node.id not in self.var_map:
self.var_map[node.id] = f'var_{self.var_counter}'
self.var_counter += 1
node.id = self.var_map[node.id]
elif isinstance(node.ctx, ast.Load):
# This is a variable usage
if node.id in self.var_map:
node.id = self.var_map[node.id]
# Don't normalize built-ins and imports
return node
def visit_FunctionDef(self, node):
# Normalize function name
if node.name not in self.func_map:
self.func_map[node.name] = f'func_{self.func_counter}'
self.func_counter += 1
node.name = self.func_map[node.name]
# Process function body
for field, old_value in ast.iter_fields(node):
if isinstance(old_value, list):
new_values = []
for value in old_value:
if isinstance(value, ast.AST):
value = self.visit(value)
if value is None:
continue
elif not isinstance(value, ast.AST):
new_values.extend(value)
continue
new_values.append(value)
old_value[:] = new_values
elif isinstance(old_value, ast.AST):
new_node = self.visit(old_value)
if new_node is None:
delattr(node, field)
else:
setattr(node, field, new_node)
return node
def visit_ClassDef(self, node):
# Normalize class name
if node.name not in self.class_map:
self.class_map[node.name] = f'class_{self.class_counter}'
self.class_counter += 1
node.name = self.class_map[node.name]
# Process class body
for field, old_value in ast.iter_fields(node):
if isinstance(old_value, list):
new_values = []
for value in old_value:
if isinstance(value, ast.AST):
value = self.visit(value)
if value is None:
continue
elif not isinstance(value, ast.AST):
new_values.extend(value)
continue
new_values.append(value)
old_value[:] = new_values
elif isinstance(old_value, ast.AST):
new_node = self.visit(old_value)
if new_node is None:
delattr(node, field)
else:
setattr(node, field, new_node)
return node
class StructureVisitor(ast.NodeVisitor):
"""AST visitor to extract code structure information."""
def __init__(self):
self.structure = {
'classes': 0,
'functions': 0,
'methods': 0,
'if_statements': 0,
'for_loops': 0,
'while_loops': 0,
'try_blocks': 0,
'imports': 0,
'assignments': 0,
'returns': 0,
'function_calls': 0,
'list_comprehensions': 0,
'max_nesting_depth': 0,
'current_nesting_depth': 0
}
def visit_ClassDef(self, node):
self.structure['classes'] += 1
self.generic_visit(node)
def visit_FunctionDef(self, node):
# Check if this is a method inside a class
parent_class = False
for parent in ast.walk(ast.parse('')):
if isinstance(parent, ast.ClassDef) and node in parent.body:
parent_class = True
break
if parent_class:
self.structure['methods'] += 1
else:
self.structure['functions'] += 1
# Visit function body with increased nesting depth
self.structure['current_nesting_depth'] += 1
self.structure['max_nesting_depth'] = max(
self.structure['max_nesting_depth'],
self.structure['current_nesting_depth']
)
self.generic_visit(node)
# Restore nesting depth
self.structure['current_nesting_depth'] -= 1
def visit_If(self, node):
self.structure['if_statements'] += 1
# Increase nesting for the body
self.structure['current_nesting_depth'] += 1
self.structure['max_nesting_depth'] = max(
self.structure['max_nesting_depth'],
self.structure['current_nesting_depth']
)
self.generic_visit(node)
# Restore nesting depth
self.structure['current_nesting_depth'] -= 1
def visit_For(self, node):
self.structure['for_loops'] += 1
# Increase nesting for the loop body
self.structure['current_nesting_depth'] += 1
self.structure['max_nesting_depth'] = max(
self.structure['max_nesting_depth'],
self.structure['current_nesting_depth']
)
self.generic_visit(node)
# Restore nesting depth
self.structure['current_nesting_depth'] -= 1
def visit_While(self, node):
self.structure['while_loops'] += 1
# Increase nesting for the loop body
self.structure['current_nesting_depth'] += 1
self.structure['max_nesting_depth'] = max(
self.structure['max_nesting_depth'],
self.structure['current_nesting_depth']
)
self.generic_visit(node)
# Restore nesting depth
self.structure['current_nesting_depth'] -= 1
def visit_Try(self, node):
self.structure['try_blocks'] += 1
self.generic_visit(node)
def visit_Import(self, node):
self.structure['imports'] += len(node.names)
self.generic_visit(node)
def visit_ImportFrom(self, node):
self.structure['imports'] += len(node.names)
self.generic_visit(node)
def visit_Assign(self, node):
self.structure['assignments'] += 1
self.generic_visit(node)
def visit_Return(self, node):
self.structure['returns'] += 1
self.generic_visit(node)
def visit_Call(self, node):
self.structure['function_calls'] += 1
self.generic_visit(node)
def visit_ListComp(self, node):
self.structure['list_comprehensions'] += 1
self.generic_visit(node)
class PlagiarismDetector:
"""Detect plagiarism in code submissions."""
def __init__(self, db_path='plagiarism.db'):
self.db_path = db_path
self.code_processor = CodeProcessor()
self.init_db()
def init_db(self):
"""Initialize the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create submissions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS submissions (
id INTEGER PRIMARY KEY,
assignment_id TEXT,
student_id TEXT,
submission_time TIMESTAMP,
code_hash TEXT,
code_text TEXT,
normalized_code TEXT,
fingerprint TEXT
)
''')
# Create comparison results table
cursor.execute('''
CREATE TABLE IF NOT EXISTS comparisons (
id INTEGER PRIMARY KEY,
submission1_id INTEGER,
submission2_id INTEGER,
similarity_score REAL,
comparison_type TEXT,
comparison_time TIMESTAMP,
details TEXT,
FOREIGN KEY (submission1_id) REFERENCES submissions (id),
FOREIGN KEY (submission2_id) REFERENCES submissions (id)
)
''')
conn.commit()
conn.close()
def add_submission(self, assignment_id, student_id, code_text):
"""Add a new code submission to the database."""
# Process the code
fingerprint = self.code_processor.generate_code_fingerprint(code_text)
# Store in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO submissions
(assignment_id, student_id, submission_time, code_hash, code_text, normalized_code, fingerprint)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
assignment_id,
student_id,
datetime.datetime.now(),
fingerprint['code_hash'],
code_text,
fingerprint['normalized_code'],
json.dumps(fingerprint)
))
submission_id = cursor.lastrowid
conn.commit()
conn.close()
return submission_id
def compare_submissions(self, submission1_id, submission2_id):
"""Compare two submissions and calculate similarity scores."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get submission data
cursor.execute('''
SELECT code_text, normalized_code, fingerprint
FROM submissions
WHERE id = ?
''', (submission1_id,))
sub1_data = cursor.fetchone()
cursor.execute('''
SELECT code_text, normalized_code, fingerprint
FROM submissions
WHERE id = ?
''', (submission2_id,))
sub2_data = cursor.fetchone()
if not sub1_data or not sub2_data:
conn.close()
return None
code1, norm_code1, fingerprint1 = sub1_data
code2, norm_code2, fingerprint2 = sub2_data
fingerprint1 = json.loads(fingerprint1)
fingerprint2 = json.loads(fingerprint2)
# Calculate similarity scores using different methods
# 1. Normalized code similarity (TF-IDF + Cosine)
vectorizer = TfidfVectorizer()
try:
tfidf_matrix = vectorizer.fit_transform([norm_code1, norm_code2])
code_similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
except:
code_similarity = 0
# 2. Structure similarity
structure1 = fingerprint1['structure']
structure2 = fingerprint2['structure']
# Convert structures to vectors
structure_vec1 = np.array([
structure1.get('classes', 0),
structure1.get('functions', 0),
structure1.get('methods', 0),
structure1.get('if_statements', 0),
structure1.get('for_loops', 0),
structure1.get('while_loops', 0),
structure1.get('try_blocks', 0),
structure1.get('imports', 0),
structure1.get('assignments', 0),
structure1.get('returns', 0),
structure1.get('function_calls', 0),
structure1.get('list_comprehensions', 0),
structure1.get('max_nesting_depth', 0)
])
structure_vec2 = np.array([
structure2.get('classes', 0),
structure2.get('functions', 0),
structure2.get('methods', 0),
structure2.get('if_statements', 0),
structure2.get('for_loops', 0),
structure2.get('while_loops', 0),
structure2.get('try_blocks', 0),
structure2.get('imports', 0),
structure2.get('assignments', 0),
structure2.get('returns', 0),
structure2.get('function_calls', 0),
structure2.get('list_comprehensions', 0),
structure2.get('max_nesting_depth', 0)
])
# Normalize vectors
norm1 = np.linalg.norm(structure_vec1)
norm2 = np.linalg.norm(structure_vec2)
if norm1 > 0 and norm2 > 0:
structure_vec1 = structure_vec1 / norm1
structure_vec2 = structure_vec2 / norm2
structure_similarity = np.dot(structure_vec1, structure_vec2)
else:
structure_similarity = 0
# 3. Sequence similarity using diff
sequence_similarity = difflib.SequenceMatcher(None, norm_code1, norm_code2).ratio()
# Calculate overall similarity score (weighted average)
overall_similarity = (0.5 * code_similarity +
0.3 * structure_similarity +
0.2 * sequence_similarity)
# Store comparison results
details = {
'code_similarity': code_similarity,
'structure_similarity': structure_similarity,
'sequence_similarity': sequence_similarity,
'diff_highlights': [], # Placeholder for diff highlights
'structure_comparison': {
'structure1': structure1,
'structure2': structure2
}
}
# Generate diff highlights for user-friendly display
diff = difflib.ndiff(norm_code1.splitlines(), norm_code2.splitlines())
diff_highlights = list(diff)
details['diff_highlights'] = diff_highlights
cursor.execute('''
INSERT INTO comparisons
(submission1_id, submission2_id, similarity_score, comparison_type, comparison_time, details)
VALUES (?, ?, ?, ?, ?, ?)
''', (
submission1_id,
submission2_id,
overall_similarity,
'code',
datetime.datetime.now(),
json.dumps(details)
))
comparison_id = cursor.lastrowid
conn.commit()
conn.close()
return {
'id': comparison_id,
'overall_similarity': overall_similarity,
'code_similarity': code_similarity,
'structure_similarity': structure_similarity,
'sequence_similarity': sequence_similarity,
'details': details
}
def check_plagiarism(self, assignment_id, submission_id):
"""Check a submission against all other submissions for the same assignment."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get all other submissions for the same assignment
cursor.execute('''
SELECT id, student_id
FROM submissions
WHERE assignment_id = ? AND id != ?
''', (assignment_id, submission_id))
other_submissions = cursor.fetchall()
conn.close()
results = []
for other_id, other_student in other_submissions:
comparison = self.compare_submissions(submission_id, other_id)
if comparison:
results.append({
'other_submission_id': other_id,
'other_student_id': other_student,
'similarity': comparison['overall_similarity'],
'comparison_id': comparison['id']
})
# Sort by similarity, highest first
results.sort(key=lambda x: x['similarity'], reverse=True)
return results
def get_comparison_details(self, comparison_id):
"""Get detailed information about a comparison."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT c.similarity_score, c.details,
s1.student_id as student1_id, s1.code_text as code1,
s2.student_id as student2_id, s2.code_text as code2
FROM comparisons c
JOIN submissions s1 ON c.submission1_id = s1.id
JOIN submissions s2 ON c.submission2_id = s2.id
WHERE c.id = ?
''', (comparison_id,))
result = cursor.fetchone()
conn.close()
if not result:
return None
similarity, details_json, student1, code1, student2, code2 = result
details = json.loads(details_json)
# Highlight code for display
highlighted_code1 = self.code_processor.highlight_code(code1)
highlighted_code2 = self.code_processor.highlight_code(code2)
return {
'similarity': similarity,
'student1': student1,
'student2': student2,
'code1': code1,
'code2': code2,
'highlighted_code1': highlighted_code1,
'highlighted_code2': highlighted_code2,
'details': details
}
# Flask application routes
detector = PlagiarismDetector()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/submit', methods=['POST'])
def submit_code():
try:
data = request.json
assignment_id = data.get('assignment_id')
student_id = data.get('student_id')
code = data.get('code')
if not assignment_id or not student_id or not code:
return jsonify({'error': 'Missing required fields'}), 400
# Add submission to database
submission_id = detector.add_submission(assignment_id, student_id, code)
# Check for plagiarism
results = detector.check_plagiarism(assignment_id, submission_id)
# Categorize results
high_similarity = [r for r in results if r['similarity'] > 0.8]
medium_similarity = [r for r in results if 0.6 <= r['similarity'] <= 0.8]
low_similarity = [r for r in results if 0.4 <= r['similarity'] < 0.6]
return jsonify({
'submission_id': submission_id,
'plagiarism_check': {
'high_similarity': high_similarity,
'medium_similarity': medium_similarity,
'low_similarity': low_similarity,
'total_comparisons': len(results)
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/compare/<int:comparison_id>')
def get_comparison(comparison_id):
try:
details = detector.get_comparison_details(comparison_id)
if not details:
return jsonify({'error': 'Comparison not found'}), 404
return jsonify(details)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/github_import', methods=['POST'])
def github_import():
try:
data = request.json
repo_url = data.get('repo_url')
assignment_id = data.get('assignment_id')
access_token = data.get('access_token') # Optional
if not repo_url or not assignment_id:
return jsonify({'error': 'Missing required fields'}), 400
# Extract owner and repo name from URL
match = re.search(r'github\.com/([^/]+)/([^/]+)', repo_url)
if not match:
return jsonify({'error': 'Invalid GitHub repository URL'}), 400
owner, repo_name = match.groups()
# Initialize GitHub client
g = Github(access_token) if access_token else Github()
# Get repository
repo = g.get_repo(f"{owner}/{repo_name}")
# Get all Python files
contents = repo.get_contents("")
python_files = []
while contents:
file_content = contents.pop(0)
if file_content.type == "dir":
contents.extend(repo.get_contents(file_content.path))
elif file_content.path.endswith(".py"):
python_files.append({
'path': file_content.path,
'content': file_content.decoded_content.decode('utf-8')
})
# Process each file as a submission
submission_ids = []
for file in python_files:
# Use file path as student ID for demonstration
student_id = file['path']
code = file['content']
submission_id = detector.add_submission(assignment_id, student_id, code)
submission_ids.append({
'submission_id': submission_id,
'file_path': file['path']
})
# Run cross-comparisons
comparison_results = []
for i, sub1 in enumerate(submission_ids):
for sub2 in submission_ids[i+1:]:
comparison = detector.compare_submissions(sub1['submission_id'], sub2['submission_id'])
if comparison:
comparison_results.append({
'file1': sub1['file_path'],
'file2': sub2['file_path'],
'similarity': comparison['overall_similarity'],
'comparison_id': comparison['id']
})
# Sort by similarity
comparison_results.sort(key=lambda x: x['similarity'], reverse=True)
return jsonify({
'submission_count': len(submission_ids),
'submissions': submission_ids,
'comparisons': comparison_results
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
Resources:
8. Virtual Collaborative Research Laboratory

Difficulty Level: Advanced
Skills Developed: Real-time Communication, Collaboration Tools, Version Control, Data Visualization
Project Description: Develop a virtual collaborative environment where students can work together on research projects in real-time. This platform includes shared code editing, data visualization, video conferencing, and integrated research tools, making remote collaboration seamless for academic projects.
Key Features:
- Real-time collaborative code editing
- Integrated version control
- Interactive data visualization workspace
- Video conferencing with screen sharing
- Project management tools
- Research paper citation and organization
Implementation Steps:
- Set up the backend infrastructure with Django or Flask
- Implement WebSockets for real-time collaboration
- Integrate a code editor component
- Develop the data visualization workspace
- Add video conferencing capability
- Create the project management interface
Sample Code Implementation:
pythonCopy# collaborative_lab.py
'id': new_member.id,
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import os
import json
import uuid
import datetime
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg') # Use Agg backend for headless environments
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from flask_cors import CORS
import logging
import re
# Configuration
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///collaborative_lab.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max upload
app.config['ALLOWED_EXTENSIONS'] = {'csv', 'xlsx', 'txt', 'json', 'py', 'ipynb', 'pdf', 'docx'}
# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Initialize extensions
db = SQLAlchemy(app)
migrate = Migrate(app, db)
socketio = SocketIO(app, cors_allowed_origins="*")
CORS(app)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database Models
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128))
full_name = db.Column(db.String(100))
institution = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
projects = db.relationship('Project', secondary='project_members', backref='members')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Project(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
owner = db.relationship('User', foreign_keys=[owner_id])
files = db.relationship('File', backref='project', cascade='all, delete-orphan')
tasks = db.relationship('Task', backref='project', cascade='all, delete-orphan')
citations = db.relationship('Citation', backref='project', cascade='all, delete-orphan')
# Association table for many-to-many relationship between users and projects
project_members = db.Table('project_members',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('project_id', db.Integer, db.ForeignKey('project.id'), primary_key=True)
)
class File(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(100), nullable=False)
file_path = db.Column(db.String(255), nullable=False)
file_type = db.Column(db.String(20))
description = db.Column(db.Text)
uploaded_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
version = db.Column(db.Integer, default=1)
uploader = db.relationship('User', foreign_keys=[uploaded_by])
class FileVersion(db.Model):
id = db.Column(db.Integer, primary_key=True)
file_id = db.Column(db.Integer, db.ForeignKey('file.id'))
version = db.Column(db.Integer, nullable=False)
file_path = db.Column(db.String(255), nullable=False)
committed_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
committed_by = db.Column(db.Integer, db.ForeignKey('user.id'))
commit_message = db.Column(db.Text)
file = db.relationship('File', foreign_keys=[file_id])
committer = db.relationship('User', foreign_keys=[committed_by])
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
status = db.Column(db.String(20), default='todo') # todo, in_progress, review, done
priority = db.Column(db.String(20), default='medium') # low, medium, high
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
due_date = db.Column(db.DateTime)
assigned_to = db.Column(db.Integer, db.ForeignKey('user.id'))
created_by = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
assignee = db.relationship('User', foreign_keys=[assigned_to])
creator = db.relationship('User', foreign_keys=[created_by])
class Citation(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(255), nullable=False)
authors = db.Column(db.Text)
journal = db.Column(db.String(255))
year = db.Column(db.Integer)
doi = db.Column(db.String(100))
url = db.Column(db.String(255))
abstract = db.Column(db.Text)
notes = db.Column(db.Text)
added_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
added_by = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
adder = db.relationship('User', foreign_keys=[added_by])
class ChatMessage(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
user = db.relationship('User', foreign_keys=[user_id])
project = db.relationship('Project', foreign_keys=[project_id])
# Helper Functions
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
def save_file(file, project_id, user_id, description=""):
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_type = filename.rsplit('.', 1)[1].lower()
# Create unique filename
unique_filename = f"{uuid.uuid4()}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
# Create file record
new_file = File(
filename=filename,
file_path=file_path,
file_type=file_type,
description=description,
uploaded_by=user_id,
project_id=project_id
)
db.session.add(new_file)
db.session.commit()
# Create initial version
new_version = FileVersion(
file_id=new_file.id,
version=1,
file_path=file_path,
committed_by=user_id,
commit_message="Initial upload"
)
db.session.add(new_version)
db.session.commit()
return new_file
return None
def update_file(file_id, new_file, user_id, commit_message=""):
file_record = File.query.get(file_id)
if not file_record:
return None
if new_file and allowed_file(new_file.filename):
# Create new version
new_version_num = file_record.version + 1
# Create unique filename for new version
filename = secure_filename(file_record.filename)
unique_filename = f"{uuid.uuid4()}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
new_file.save(file_path)
# Update file record
file_record.version = new_version_num
file_record.file_path = file_path
file_record.updated_at = datetime.datetime.utcnow()
# Create version record
new_version = FileVersion(
file_id=file_id,
version=new_version_num,
file_path=file_path,
committed_by=user_id,
commit_message=commit_message
)
db.session.add(new_version)
db.session.commit()
return file_record
return None
def get_file_content(file_path, file_type):
"""Get content of a file, possibly with preprocessing for specific types."""
if not os.path.exists(file_path):
return None
if file_type in ['csv', 'txt']:
with open(file_path, 'r') as f:
return f.read()
elif file_type == 'json':
with open(file_path, 'r') as f:
return json.load(f)
elif file_type == 'py':
with open(file_path, 'r') as f:
return f.read()
else:
# For binary files, just indicate they exist but don't return content
return f"Binary file of type {file_type}"
def generate_visualization(file_path, file_type, vis_type, options):
"""Generate a visualization based on the given file and options."""
if file_type == 'csv':
try:
df = pd.read_csv(file_path)
return create_visualization(df, vis_type, options)
except Exception as e:
return {'error': str(e)}
elif file_type == 'xlsx':
try:
df = pd.read_excel(file_path)
return create_visualization(df, vis_type, options)
except Exception as e:
return {'error': str(e)}
else:
return {'error': 'Unsupported file type for visualization'}
def create_visualization(df, vis_type, options):
"""Create a visualization using matplotlib/seaborn."""
plt.figure(figsize=(10, 6))
try:
if vis_type == 'line':
x = options.get('x')
y = options.get('y')
if x and y:
sns.lineplot(data=df, x=x, y=y)
plt.title(options.get('title', f"{y} vs {x}"))
else:
return {'error': 'Missing x or y axis specification'}
elif vis_type == 'bar':
x = options.get('x')
y = options.get('y')
if x and y:
sns.barplot(data=df, x=x, y=y)
plt.title(options.get('title', f"{y} by {x}"))
else:
return {'error': 'Missing x or y axis specification'}
elif vis_type == 'scatter':
x = options.get('x')
y = options.get('y')
if x and y:
hue = options.get('hue')
if hue:
sns.scatterplot(data=df, x=x, y=y, hue=hue)
else:
sns.scatterplot(data=df, x=x, y=y)
plt.title(options.get('title', f"{y} vs {x}"))
else:
return {'error': 'Missing x or y axis specification'}
elif vis_type == 'histogram':
column = options.get('column')
if column:
sns.histplot(data=df, x=column, bins=options.get('bins', 10))
plt.title(options.get('title', f"Distribution of {column}"))
else:
return {'error': 'Missing column specification'}
elif vis_type == 'heatmap':
if not options.get('columns'):
# Use numeric columns for correlation heatmap
numeric_df = df.select_dtypes(include=[np.number])
if numeric_df.empty:
return {'error': 'No numeric columns for heatmap'}
corr = numeric_df.corr()
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title(options.get('title', 'Correlation Heatmap'))
else:
# Use specified columns
columns = options.get('columns')
pivot_column = options.get('pivot_column')
value_column = options.get('value_column')
if pivot_column and value_column:
pivot_table = df.pivot(index=columns[0], columns=pivot_column, values=value_column)
sns.heatmap(pivot_table, annot=True, cmap='coolwarm')
plt.title(options.get('title', f'Heatmap of {value_column}'))
else:
return {'error': 'Missing pivot or value column specification'}
else:
return {'error': 'Unsupported visualization type'}
# Save figure to a bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
# Convert to base64 for embedding in HTML
data = base64.b64encode(buf.read()).decode('utf-8')
plt.close()
return {
'image': f'data:image/png;base64,{data}',
'type': vis_type
}
except Exception as e:
plt.close()
return {'error': str(e)}
# API Routes
@app.route('/')
def home():
if 'user_id' in session:
return redirect(url_for('dashboard'))
return render_template('index.html')
@app.route('/api/register', methods=['POST'])
def register():
data = request.json
# Check if user already exists
existing_user = User.query.filter_by(email=data['email']).first()
if existing_user:
return jsonify({'error': 'Email already registered'}), 400
existing_username = User.query.filter_by(username=data['username']).first()
if existing_username:
return jsonify({'error': 'Username already taken'}), 400
# Create new user
new_user = User(
username=data['username'],
email=data['email'],
full_name=data.get('full_name', ''),
institution=data.get('institution', '')
)
new_user.set_password(data['password'])
db.session.add(new_user)
db.session.commit()
return jsonify({'message': 'User registered successfully', 'user_id': new_user.id})
@app.route('/api/login', methods=['POST'])
def login():
data = request.json
user = User.query.filter_by(username=data['username']).first()
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid username or password'}), 401
session['user_id'] = user.id
return jsonify({
'message': 'Login successful',
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'full_name': user.full_name
}
})
@app.route('/api/logout', methods=['POST'])
def logout():
session.pop('user_id', None)
return jsonify({'message': 'Logout successful'})
@app.route('/api/projects', methods=['GET'])
def get_projects():
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
user = User.query.get(session['user_id'])
if not user:
return jsonify({'error': 'User not found'}), 404
# Get projects where user is either owner or member
owned_projects = Project.query.filter_by(owner_id=user.id).all()
member_projects = user.projects
all_projects = set(owned_projects + member_projects)
projects_data = []
for project in all_projects:
projects_data.append({
'id': project.id,
'name': project.name,
'description': project.description,
'created_at': project.created_at.isoformat(),
'updated_at': project.updated_at.isoformat(),
'owner': {
'id': project.owner.id,
'username': project.owner.username
} if project.owner else None,
'is_owner': project.owner_id == user.id,
'member_count': len(project.members)
})
return jsonify(projects_data)
@app.route('/api/projects', methods=['POST'])
def create_project():
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
data = request.json
new_project = Project(
name=data['name'],
description=data.get('description', ''),
owner_id=session['user_id']
)
# Add owner as a member
user = User.query.get(session['user_id'])
new_project.members.append(user)
db.session.add(new_project)
db.session.commit()
return jsonify({
'message': 'Project created successfully',
'project': {
'id': new_project.id,
'name': new_project.name,
'description': new_project.description
}
})
@app.route('/api/projects/<int:project_id>', methods=['GET'])
def get_project(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Check if user is a member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
members_data = []
for member in project.members:
members_data.append({
'id': member.id,
'username': member.username,
'full_name': member.full_name,
'is_owner': member.id == project.owner_id
})
files_data = []
for file in project.files:
files_data.append({
'id': file.id,
'filename': file.filename,
'file_type': file.file_type,
'description': file.description,
'uploaded_at': file.uploaded_at.isoformat(),
'updated_at': file.updated_at.isoformat(),
'version': file.version,
'uploader': {
'id': file.uploader.id,
'username': file.uploader.username
} if file.uploader else None
})
tasks_data = []
for task in project.tasks:
tasks_data.append({
'id': task.id,
'title': task.title,
'description': task.description,
'status': task.status,
'priority': task.priority,
'created_at': task.created_at.isoformat(),
'due_date': task.due_date.isoformat() if task.due_date else None,
'assignee': {
'id': task.assignee.id,
'username': task.assignee.username
} if task.assignee else None,
'creator': {
'id': task.creator.id,
'username': task.creator.username
} if task.creator else None
})
citations_data = []
for citation in project.citations:
citations_data.append({
'id': citation.id,
'title': citation.title,
'authors': citation.authors,
'journal': citation.journal,
'year': citation.year,
'doi': citation.doi,
'url': citation.url
})
project_data = {
'id': project.id,
'name': project.name,
'description': project.description,
'created_at': project.created_at.isoformat(),
'updated_at': project.updated_at.isoformat(),
'owner': {
'id': project.owner.id,
'username': project.owner.username,
'full_name': project.owner.full_name
} if project.owner else None,
'members': members_data,
'files': files_data,
'tasks': tasks_data,
'citations': citations_data,
'is_owner': project.owner_id == user.id
}
return jsonify(project_data)
@app.route('/api/projects/<int:project_id>/members', methods=['POST'])
def add_member(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Verify user is project owner
if project.owner_id != session['user_id']:
return jsonify({'error': 'Only the project owner can add members'}), 403
data = request.json
username = data.get('username')
email = data.get('email')
# Find user by username or email
if username:
new_member = User.query.filter_by(username=username).first()
elif email:
new_member = User.query.filter_by(email=email).first()
else:
return jsonify({'error': 'Username or email required'}), 400
if not new_member:
return jsonify({'error': 'User not found'}), 404
# Check if user is already a member
if new_member in project.members:
return jsonify({'error': 'User is already a project member'}), 400
# Add member
project.members.append(new_member)
db.session.commit()
return jsonify({
'message': 'Member added successfully',
'member': {
'username': new_member.username,
'full_name': new_member.full_name
}
})
@app.route('/api/projects/<int:project_id>/members/<int:user_id>', methods=['DELETE'])
def remove_member(project_id, user_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Only project owner can remove members
if project.owner_id != session['user_id']:
return jsonify({'error': 'Only the project owner can remove members'}), 403
# Owner cannot be removed
if user_id == project.owner_id:
return jsonify({'error': 'Owner cannot be removed from project'}), 400
member = User.query.get(user_id)
if not member:
return jsonify({'error': 'User not found'}), 404
if member not in project.members:
return jsonify({'error': 'User is not a project member'}), 400
# Remove member
project.members.remove(member)
db.session.commit()
return jsonify({'message': 'Member removed successfully'})
@app.route('/api/projects/<int:project_id>/files', methods=['POST'])
def upload_file_route(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Check if user is a member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
description = request.form.get('description', '')
new_file = save_file(file, project_id, user.id, description)
if not new_file:
return jsonify({'error': 'File upload failed'}), 400
return jsonify({
'message': 'File uploaded successfully',
'file': {
'id': new_file.id,
'filename': new_file.filename,
'file_type': new_file.file_type,
'description': new_file.description,
'uploaded_at': new_file.uploaded_at.isoformat(),
'version': new_file.version
}
})
@app.route('/api/files/<int:file_id>', methods=['GET'])
def get_file(file_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
file_record = File.query.get(file_id)
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(file_record.project_id)
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
# Get file versions
versions = FileVersion.query.filter_by(file_id=file_id).order_by(FileVersion.version.desc()).all()
versions_data = []
for version in versions:
versions_data.append({
'version': version.version,
'committed_at': version.committed_at.isoformat(),
'committer': {
'id': version.committer.id,
'username': version.committer.username
} if version.committer else None,
'commit_message': version.commit_message
})
# Get file content
content = get_file_content(file_record.file_path, file_record.file_type)
file_data = {
'id': file_record.id,
'filename': file_record.filename,
'file_type': file_record.file_type,
'description': file_record.description,
'uploaded_at': file_record.uploaded_at.isoformat(),
'updated_at': file_record.updated_at.isoformat(),
'version': file_record.version,
'uploader': {
'id': file_record.uploader.id,
'username': file_record.uploader.username
} if file_record.uploader else None,
'project_id': file_record.project_id,
'versions': versions_data,
'content': content
}
return jsonify(file_data)
@app.route('/api/files/<int:file_id>', methods=['PUT'])
def update_file_route(file_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
file_record = File.query.get(file_id)
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(file_record.project_id)
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
commit_message = request.form.get('commit_message', '')
updated_file = update_file(file_id, file, user.id, commit_message)
if not updated_file:
return jsonify({'error': 'File update failed'}), 400
return jsonify({
'message': 'File updated successfully',
'file': {
'id': updated_file.id,
'filename': updated_file.filename,
'file_type': updated_file.file_type,
'updated_at': updated_file.updated_at.isoformat(),
'version': updated_file.version
}
})
@app.route('/api/projects/<int:project_id>/tasks', methods=['POST'])
def create_task(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
data = request.json
# Parse due date if provided
due_date = None
if data.get('due_date'):
try:
due_date = datetime.datetime.fromisoformat(data['due_date'])
except ValueError:
return jsonify({'error': 'Invalid date format'}), 400
# Create task
new_task = Task(
title=data['title'],
description=data.get('description', ''),
status=data.get('status', 'todo'),
priority=data.get('priority', 'medium'),
due_date=due_date,
assigned_to=data.get('assigned_to'),
created_by=user.id,
project_id=project_id
)
db.session.add(new_task)
db.session.commit()
return jsonify({
'message': 'Task created successfully',
'task': {
'id': new_task.id,
'title': new_task.title,
'status': new_task.status,
'priority': new_task.priority
}
})
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
task = Task.query.get(task_id)
if not task:
return jsonify({'error': 'Task not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(task.project_id)
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
data = request.json
# Update task fields
if 'title' in data:
task.title = data['title']
if 'description' in data:
task.description = data['description']
if 'status' in data:
task.status = data['status']
if 'priority' in data:
task.priority = data['priority']
if 'assigned_to' in data:
task.assigned_to = data['assigned_to']
if 'due_date' in data:
if data['due_date']:
try:
task.due_date = datetime.datetime.fromisoformat(data['due_date'])
except ValueError:
return jsonify({'error': 'Invalid date format'}), 400
else:
task.due_date = None
db.session.commit()
return jsonify({
'message': 'Task updated successfully',
'task': {
'id': task.id,
'title': task.title,
'status': task.status,
'priority': task.priority,
'assigned_to': task.assigned_to
}
})
@app.route('/api/projects/<int:project_id>/citations', methods=['POST'])
def add_citation(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
data = request.json
# Create citation
new_citation = Citation(
title=data['title'],
authors=data.get('authors', ''),
journal=data.get('journal', ''),
year=data.get('year'),
doi=data.get('doi', ''),
url=data.get('url', ''),
abstract=data.get('abstract', ''),
notes=data.get('notes', ''),
added_by=user.id,
project_id=project_id
)
db.session.add(new_citation)
db.session.commit()
return jsonify({
'message': 'Citation added successfully',
'citation': {
'id': new_citation.id,
'title': new_citation.title,
'authors': new_citation.authors,
'year': new_citation.year
}
})
@app.route('/api/files/<int:file_id>/visualize', methods=['POST'])
def visualize_file(file_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
file_record = File.query.get(file_id)
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(file_record.project_id)
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
data = request.json
vis_type = data.get('type', 'line')
options = data.get('options', {})
visualization = generate_visualization(file_record.file_path, file_record.file_type, vis_type, options)
if 'error' in visualization:
return jsonify({'error': visualization['error']}), 400
return jsonify(visualization)
# WebSocket handlers for real-time collaboration
@socketio.on('connect')
def handle_connect():
if 'user_id' not in session:
return False # Reject connection if not authenticated
logger.info(f"Client connected: {request.sid}")
@socketio.on('join')
def handle_join(data):
if 'user_id' not in session:
return
room = data.get('room')
if not room:
return
# Check if user has access to the project
project_id = int(room.replace('project_', ''))
user = User.query.get(session['user_id'])
project = Project.query.get(project_id)
if not project or (user not in project.members and project.owner_id != user.id):
return
join_room(room)
# Notify others in the room
emit('user_joined', {
'user_id': user.id,
'username': user.username
}, room=room, include_self=False)
logger.info(f"User {user.username} joined room {room}")
@socketio.on('leave')
def handle_leave(data):
if 'user_id' not in session:
return
room = data.get('room')
if not room:
return
leave_room(room)
# Notify others in the room
user = User.query.get(session['user_id'])
emit('user_left', {
'user_id': user.id,
'username': user.username
}, room=room, include_self=False)
logger.info(f"User {user.username} left room {room}")
@socketio.on('editor_update')
def handle_editor_update(data):
if 'user_id' not in session:
return
room = data.get('room')
file_id = data.get('file_id')
content = data.get('content')
cursor_position = data.get('cursor_position')
if not room or not file_id:
return
# Check if user has access to the file
user = User.query.get(session['user_id'])
file_record = File.query.get(file_id)
if not file_record:
return
project = Project.query.get(file_record.project_id)
if not project or (user not in project.members and project.owner_id != user.id):
return
# Forward update to other users in the room
emit('editor_update', {
'user_id': user.id,
'username': user.username,
'file_id': file_id,
'content': content,
'cursor_position': cursor_position,
'timestamp': datetime.datetime.utcnow().isoformat()
}, room=room, include_self=False)
@socketio.on('chat_message')
def handle_chat_message(data):
if 'user_id' not in session:
return
room = data.get('room')
message = data.get('message')
if not room or not message:
return
# Check if user has access to the project
project_id = int(room.replace('project_', ''))
user = User.query.get(session['user_id'])
project = Project.query.get(project_id)
if not project or (user not in project.members and project.owner_id != user.id):
return
# Save message to database
new_message = ChatMessage(
content=message,
user_id=user.id,
project_id=project_id
)
db.session.add(new_message)
db.session.commit()
# Forward message to all users in the room
emit('chat_message', {
'id': new_message.id,
'user_id': user.id,
'username': user.username,
'message': message,
'timestamp': new_message.timestamp.isoformat()
}, room=room)
@app.route('/api/projects/<int:project_id>/messages', methods=['GET'])
def get_chat_history(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401
project = Project.query.get(project_id)
if not project:
return jsonify({'error': 'Project not found'}), 404
# Check if user is a project member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403
# Get chat messages
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
messages = ChatMessage.query.filter_by(project_id=project_id)\
.order_by(ChatMessage.timestamp.desc())\
.limit(limit).offset(offset).all()
messages_data = []
for message in reversed(messages): # Reverse to get chronological order
messages_data.append({
'id': message.id,
'user_id': message.user_id,
'username': message.user.username if message.user else 'Unknown',
'message': message.content,
'timestamp': message.timestamp.isoformat()
})
return jsonify(messages_data)
if __name__ == '__main__':
with app.app_context():
db.create_all()
socketio.run(app, debug=True)
Resources:
- Socket.IO for Real-time Communication
- CodeMirror for Editor
- WebRTC for Video Conferencing
- Plotly for Interactive Visualizations
9. Personalized Learning Path Generator

Difficulty Level: Intermediate
Skills Developed: Recommendation Systems, Educational Technology, Graph Algorithms, User Experience Design
Project Description: Build a system that creates personalized learning paths for students based on their goals, current skills, and learning preferences. This application maps out educational resources, courses, and projects in an optimal sequence to help students efficiently acquire new skills.
Key Features:
- Skill assessment quizzes
- Learning style identification
- Custom learning path generation
- Resource recommendation from multiple sources
- Progress tracking and adaptive paths
- Peer learning group recommendations
Implementation Steps:
- Design the knowledge graph representing skills and resources
- Implement assessment algorithms
- Develop path-finding algorithms for learning sequences
- Create recommendation engines for resources
- Build a user interface using React with a Python backend
- Implement progress tracking and adaption systems
Sample Code Implementation:
pythonCopy# learning_path_generator.py
from flask import Flask, jsonify, request, session
from flask_cors import CORS
import pandas as pd
import numpy as np
import networkx as nx
import json
import os
import sqlite3
import hashlib
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import datetime
import requests
from surprise import Dataset, Reader, SVD
from surprise.model_selection import train_test_split
from collections import defaultdict
app = Flask(__name__)
app.secret_key = 'your-secret-key'
CORS(app)
# Database setup
def init_db():
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# User profiles table
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_profiles (
user_id INTEGER PRIMARY KEY,
learning_style TEXT,
interests TEXT,
goals TEXT,
background TEXT,
preferences TEXT,
last_updated TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Skills table
cursor.execute('''
CREATE TABLE IF NOT EXISTS skills (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
category TEXT,
description TEXT,
difficulty INTEGER
)
''')
# Resources table (courses, tutorials, projects, etc.)
cursor.execute('''
CREATE TABLE IF NOT EXISTS resources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
resource_type TEXT,
url TEXT,
provider TEXT,
duration INTEGER,
difficulty INTEGER,
rating REAL,
tags TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Resource-Skill relationship
cursor.execute('''
CREATE TABLE IF NOT EXISTS resource_skills (
resource_id INTEGER,
skill_id INTEGER,
relationship_type TEXT,
weight REAL,
PRIMARY KEY (resource_id, skill_id),
FOREIGN KEY (resource_id) REFERENCES resources (id),
FOREIGN KEY (skill_id) REFERENCES skills (id)
)
''')
# Skill prerequisites
cursor.execute('''
CREATE TABLE IF NOT EXISTS skill_prerequisites (
skill_id INTEGER,
prerequisite_id INTEGER,
strength REAL,
PRIMARY KEY (skill_id, prerequisite_id),
FOREIGN KEY (skill_id) REFERENCES skills (id),
FOREIGN KEY (prerequisite_id) REFERENCES skills (id)
)
''')
# User skill assessments
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_skills (
user_id INTEGER,
skill_id INTEGER,
proficiency_level REAL,
assessment_date TIMESTAMP,
PRIMARY KEY (user_id, skill_id),
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (skill_id) REFERENCES skills (id)
)
''')
# Learning paths
cursor.execute('''
CREATE TABLE IF NOT EXISTS learning_paths (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
name TEXT,
description TEXT,
goal_skills TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Learning path steps
cursor.execute('''
CREATE TABLE IF NOT EXISTS path_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path_id INTEGER,
step_number INTEGER,
resource_id INTEGER,
status TEXT DEFAULT 'not_started',
completed_at TIMESTAMP,
FOREIGN KEY (path_id) REFERENCES learning_paths (id),
FOREIGN KEY (resource_id) REFERENCES resources (id)
)
''')
# Resource ratings by users
cursor.execute('''
CREATE TABLE IF NOT EXISTS resource_ratings (
user_id INTEGER,
resource_id INTEGER,
rating REAL,
review TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, resource_id),
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (resource_id) REFERENCES resources (id)
)
''')
# Progress tracking
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_progress (
user_id INTEGER,
resource_id INTEGER,
status TEXT,
progress REAL,
started_at TIMESTAMP,
completed_at TIMESTAMP,
notes TEXT,
PRIMARY KEY (user_id, resource_id),
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (resource_id) REFERENCES resources (id)
)
''')
# Learning groups
cursor.execute('''
CREATE TABLE IF NOT EXISTS learning_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
description TEXT,
skill_focus TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Group members
cursor.execute('''
CREATE TABLE IF NOT EXISTS group_members (
group_id INTEGER,
user_id INTEGER,
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (group_id, user_id),
FOREIGN KEY (group_id) REFERENCES learning_groups (id),
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
conn.commit()
conn.close()
# Initialize the database
init_db()
# Load sample data for demonstration
def load_sample_data():
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Check if data already exists
cursor.execute("SELECT COUNT(*) FROM skills")
if cursor.fetchone()[0] > 0:
conn.close()
return
# Sample skills
skills = [
(1, 'Python Basics', 'Programming', 'Fundamental Python programming concepts', 1),
(2, 'Data Structures', 'Programming', 'Arrays, lists, stacks, queues, trees, graphs', 2),
(3, 'Algorithms', 'Programming', 'Sorting, searching, dynamic programming', 3),
(4, 'Git', 'Tools', 'Version control with Git', 1),
(5, 'SQL', 'Database', 'Structured Query Language for databases', 2),
(6, 'Machine Learning Basics', 'Data Science', 'Introduction to ML concepts', 3),
(7, 'Neural Networks', 'Data Science', 'Deep learning and neural networks', 4),
(8, 'Web Development', 'Web', 'HTML, CSS, JavaScript basics', 2),
(9, 'Flask', 'Web', 'Python web framework', 3),
(10, 'React', 'Web', 'Frontend JavaScript library', 3),
(11, 'Data Analysis', 'Data Science', 'Data cleaning, exploration, visualization', 2),
(12, 'Natural Language Processing', 'Data Science', 'Text processing and analysis', 4),
(13, 'Computer Vision', 'Data Science', 'Image and video analysis', 4),
(14, 'DevOps Basics', 'Operations', 'CI/CD, deployment, containers', 3),
(15, 'Cloud Computing', 'Operations', 'AWS, Azure, GCP basics', 3)
]
cursor.executemany('''
INSERT OR IGNORE INTO skills (id, name, category, description, difficulty)
VALUES (?, ?, ?, ?, ?)
''', skills)
# Sample skill prerequisites
prerequisites = [
(6, 1, 0.9), # ML Basics requires Python Basics
(6, 2, 0.7), # ML Basics requires Data Structures
(6, 11, 0.8), # ML Basics requires Data Analysis
(7, 6, 0.9), # Neural Networks requires ML Basics
(9, 1, 0.8), # Flask requires Python Basics
(9, 8, 0.7), # Flask requires Web Development
(11, 1, 0.8), # Data Analysis requires Python Basics
(12, 6, 0.8), # NLP requires ML Basics
(12, 11, 0.7), # NLP requires Data Analysis
(13, 6, 0.8), # Computer Vision requires ML Basics
(14, 4, 0.6) # DevOps requires Git
]
cursor.executemany('''
INSERT OR IGNORE INTO skill_prerequisites (skill_id, prerequisite_id, strength)
VALUES (?, ?, ?)
''', prerequisites)
# Sample resources
resources = [
(1, 'Python for Beginners', 'Comprehensive Python course for beginners', 'course',
'https://example.com/python-beginners', 'Codecademy', 20, 1, 4.5, 'python,programming,beginners'),
(2, 'Data Structures in Python', 'Learn essential data structures', 'course',
'https://example.com/python-data-structures', 'Udacity', 30, 2, 4.2, 'python,data-structures,algorithms'),
(3, 'Introduction to Algorithms', 'Algorithm design and analysis', 'book',
'https://example.com/intro-algorithms', 'MIT Press', 60, 3, 4.8, 'algorithms,computer-science'),
(4, 'Git & GitHub Crash Course', 'Quick introduction to Git', 'tutorial',
'https://example.com/git-crash-course', 'YouTube', 2, 1, 4.7, 'git,github,version-control'),
(5, 'SQL for Data Analysis', 'Using SQL for data work', 'course',
'https://example.com/sql-data-analysis', 'DataCamp', 15, 2, 4.4, 'sql,database,data-analysis'),
(6, 'Machine Learning Foundations', 'Introduction to ML concepts and techniques', 'course',
'https://example.com/ml-foundations', 'Coursera', 40, 3, 4.6, 'machine-learning,data-science,python'),
(7, 'Deep Learning Specialization', 'Comprehensive deep learning course', 'specialization',
'https://example.com/deep-learning', 'Coursera', 80, 4, 4.9, 'deep-learning,neural-networks,ai'),
(8, 'Web Development Bootcamp', 'Complete web dev course', 'bootcamp',
'https://example.com/web-bootcamp', 'Udemy', 60, 2, 4.5, 'web,html,css,javascript'),
(9, 'Flask Web Development', 'Building web applications with Flask', 'course',
'https://example.com/flask-dev', 'Real Python', 25, 3, 4.3, 'flask,python,web-development'),
(10, 'React - The Complete Guide', 'Comprehensive React tutorial', 'course',
'https://example.com/react-guide', 'Udemy', 40, 3, 4.7, 'react,javascript,frontend')
]
cursor.executemany('''
INSERT OR IGNORE INTO resources (id, title, description, resource_type, url, provider, duration, difficulty, rating, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', resources)
# Sample resource-skill relationships
resource_skills = [
(1, 1, 'teaches', 0.9), # Python for Beginners teaches Python Basics
(2, 2, 'teaches', 0.9), # Data Structures in Python teaches Data Structures
(3, 3, 'teaches', 0.9), # Intro to Algorithms teaches Algorithms
(4, 4, 'teaches', 0.9), # Git Crash Course teaches Git
(5, 5, 'teaches', 0.9), # SQL for Data Analysis teaches SQL
(6, 6, 'teaches', 0.9), # ML Foundations teaches ML Basics
(7, 7, 'teaches', 0.9), # Deep Learning Specialization teaches Neural Networks
(8, 8, 'teaches', 0.9), # Web Development Bootcamp teaches Web Development
(9, 9, 'teaches', 0.9), # Flask Web Development teaches Flask
(10, 10, 'teaches', 0.9), # React Guide teaches React
(6, 1, 'requires', 0.7), # ML Foundations requires Python Basics
(6, 11, 'teaches', 0.5), # ML Foundations partially teaches Data Analysis
(7, 6, 'requires', 0.8), # Deep Learning requires ML Basics
(9, 1, 'requires', 0.7), # Flask Web Development requires Python Basics
(9, 8, 'requires', 0.5) # Flask Web Development somewhat requires Web Development
]
cursor.executemany('''
INSERT OR IGNORE INTO resource_skills (resource_id, skill_id, relationship_type, weight)
VALUES (?, ?, ?, ?)
''', resource_skills)
conn.commit()
conn.close()
# Load sample data
load_sample_data()
class SkillGraph:
"""Represents the knowledge graph of skills and their relationships."""
def __init__(self):
self.graph = nx.DiGraph()
self.load_from_database()
def load_from_database(self):
"""Load skills and prerequisites from the database."""
conn = sqlite3.connect('learning_path.db')
# Load skills
skills_df = pd.read_sql("SELECT id, name, category, difficulty FROM skills", conn)
# Add nodes to the graph
for _, skill in skills_df.iterrows():
self.graph.add_node(
skill['id'],
name=skill['name'],
category=skill['category'],
difficulty=skill['difficulty']
)
# Load prerequisites
prereqs_df = pd.read_sql(
"SELECT skill_id, prerequisite_id, strength FROM skill_prerequisites",
conn
)
# Add edges to the graph
for _, prereq in prereqs_df.iterrows():
self.graph.add_edge(
prereq['prerequisite_id'],
prereq['skill_id'],
weight=prereq['strength']
)
conn.close()
def get_prerequisites(self, skill_id, recursive=False):
"""Get prerequisites for a skill."""
if not recursive:
# Direct prerequisites only
return list(self.graph.predecessors(skill_id))
else:
# All ancestors (recursive prerequisites)
return list(nx.ancestors(self.graph, skill_id))
def get_dependent_skills(self, skill_id, recursive=False):
"""Get skills that depend on this skill."""
if not recursive:
# Direct dependents only
return list(self.graph.successors(skill_id))
else:
# All descendants (recursive dependents)
return list(nx.descendants(self.graph, skill_id))
def find_shortest_path(self, from_skill_id, to_skill_id):
"""Find the shortest path between two skills."""
try:
path = nx.shortest_path(self.graph, from_skill_id, to_skill_id)
return path
except (nx.NetworkXNoPath, nx.NodeNotFound):
return None
def find_learning_path(self, user_skills, target_skills):
"""
Find an optimal learning path from user's current skills to target skills.
Parameters:
- user_skills: dict mapping skill_id to proficiency level (0-1)
- target_skills: list of skill_ids to learn
Returns:
- ordered list of skill_ids representing the learning path
"""
# Create a subgraph of skills the user hasn't mastered yet
unmastered_graph = self.graph.copy()
# Remove skills the user has already mastered (proficiency > 0.7)
for skill_id, proficiency in user_skills.items():
if proficiency > 0.7 and skill_id in unmastered_graph:
unmastered_graph.remove_node(skill_id)
learning_path = []
visited = set()
# Find path to each target skill
for target in target_skills:
if target in visited:
continue
# Find prerequisites for the target skill
prereqs = self.get_prerequisites(target, recursive=True)
# Sort prerequisites by their topological order
if prereqs:
# Get subgraph containing only these prerequisites
prereq_graph = nx.subgraph(self.graph, prereqs + [target])
try:
# Get topological sort (respects prerequisites)
topo_sort = list(nx.topological_sort(prereq_graph))
# Filter out skills that are already mastered or visited
filtered_path = [s for s in topo_sort if (
s not in user_skills or user_skills[s] <= 0.7
) and s not in visited]
learning_path.extend(filtered_path)
visited.update(filtered_path)
except nx.NetworkXUnfeasible:
# Graph has cycles, use a different approach
for prereq in sorted(prereqs, key=lambda x: self.graph.nodes[x]['difficulty']):
if prereq not in visited and (prereq not in user_skills or user_skills[prereq] <= 0.7):
learning_path.append(prereq)
visited.add(prereq)
# Add the target skill itself if not mastered
if target not in visited and (target not in user_skills or user_skills[target] <= 0.7):
learning_path.append(target)
visited.add(target)
return learning_path
class ResourceRecommender:
"""Recommends learning resources based on skills and user preferences."""
def __init__(self):
self.vectorizer = TfidfVectorizer(stop_words='english')
self.load_resources()
def load_resources(self):
"""Load resources from the database."""
conn = sqlite3.connect('learning_path.db')
# Load resources
self.resources_df = pd.read_sql("""
SELECT r.id, r.title, r.description, r.resource_type, r.provider,
r.duration, r.difficulty, r.rating, r.tags, rs.skill_id, rs.relationship_type, rs.weight
FROM resources r
JOIN resource_skills rs ON r.id = rs.resource_id
""", conn)
# Prepare content for recommendation
self.resources_df['content'] = (
self.resources_df['title'] + ' ' +
self.resources_df['description'] + ' ' +
self.resources_df['tags']
)
# Compute TF-IDF matrix
if len(self.resources_df) > 0:
self.tfidf_matrix = self.vectorizer.fit_transform(self.resources_df['content'])
else:
self.tfidf_matrix = None
conn.close()
def recommend_for_skill(self, skill_id, user_id=None, n=3):
"""Recommend resources for learning a specific skill."""
# Filter resources that teach this skill
skill_resources = self.resources_df[
(self.resources_df['skill_id'] == skill_id) &
(self.resources_df['relationship_type'] == 'teaches')
]
if len(skill_resources) == 0:
return []
# If we have user_id, use their ratings for personalized recommendations
if user_id:
conn = sqlite3.connect('learning_path.db')
user_ratings = pd.read_sql(f"""
SELECT resource_id, rating
FROM resource_ratings
WHERE user_id = {user_id}
""", conn)
conn.close()
if len(user_ratings) > 0:
# Use collaborative filtering if we have enough ratings
if len(user_ratings) >= 5:
return self._collaborative_recommendations(user_id, skill_id, n)
# Otherwise use a hybrid approach
return self._hybrid_recommendations(user_id, skill_id, n)
# If no user_id or not enough ratings, use content-based
return self._content_based_recommendations(skill_id, n)
def _content_based_recommendations(self, skill_id, n=3):
"""Generate content-based recommendations."""
# Filter resources for this skill
skill_resources = self.resources_df[
(self.resources_df['skill_id'] == skill_id) &
(self.resources_df['relationship_type'] == 'teaches')
]
if len(skill_resources) <= n:
return skill_resources.sort_values('rating', ascending=False)[['id', 'title', 'resource_type', 'difficulty', 'rating']].to_dict('records')
# Sort by a combination of rating and teaching weight
skill_resources['score'] = skill_resources['rating'] * skill_resources['weight']
return skill_resources.sort_values('score', ascending=False).head(n)[['id', 'title', 'resource_type', 'difficulty', 'rating']].to_dict('records')
def _collaborative_recommendations(self, user_id, skill_id, n=3):
"""Generate collaborative filtering recommendations."""
conn = sqlite3.connect('learning_path.db')
# Get all ratings
ratings_df = pd.read_sql("""
SELECT user_id, resource_id, rating
FROM resource_ratings
""", conn)
# Get resources for this skill
skill_resources = pd.read_sql(f"""
SELECT r.id, r.title, r.resource_type, r.difficulty, r.rating
FROM resources r
JOIN resource_skills rs ON r.id = rs.resource_id
WHERE rs.skill_id = {skill_id} AND rs.relationship_type = 'teaches'
""", conn)
conn.close()
if len(ratings_df) < 10 or len(skill_resources) == 0:
# Not enough data for collaborative filtering
return self._content_based_recommendations(skill_id, n)
# Prepare data for Surprise
reader = Reader(rating_scale=(1, 5))
data = Dataset.load_from_df(ratings_df[['user_id', 'resource_id', 'rating']], reader)
# Create a training set
trainset = data.build_full_trainset()
# Train the algorithm
algo = SVD()
algo.fit(trainset)
# Predict ratings for resources this user hasn't rated
user_ratings = ratings_df[ratings_df['user_id'] == user_id]['resource_id'].tolist()
skill_resource_ids = skill_resources['id'].tolist()
predictions = []
for resource_id in skill_resource_ids:
if resource_id not in user_ratings:
pred = algo.predict(user_id, resource_id)
predictions.append((resource_id, pred.est))
# Sort by predicted rating
predictions.sort(key=lambda x: x[1], reverse=True)
top_resources = [p[0] for p in predictions[:n]]
# Get resource details
recommended_resources = skill_resources[skill_resources['id'].isin(top_resources)]
# Re-sort by predicted rating order
recommended_resources = recommended_resources.set_index('id').loc[top_resources].reset_index()
return recommended_resources.to_dict('records')
def _hybrid_recommendations(self, user_id, skill_id, n=3):
"""Combine content-based and collaborative filtering approaches."""
# Start with content-based recommendations
content_recs = self._content_based_recommendations(skill_id, n)
# Get user preferences
conn = sqlite3.connect('learning_path.db')
user_profile = pd.read_sql(f"""
SELECT preferences
FROM user_profiles
WHERE user_id = {user_id}
""", conn)
user_ratings = pd.read_sql(f"""
SELECT r.resource_type, r.difficulty, r.provider, rr.rating
FROM resource_ratings rr
JOIN resources r ON rr.resource_id = r.id
WHERE rr.user_id = {user_id}
""", conn)
conn.close()
# If no user data, return content-based only
if len(user_profile) == 0 and len(user_ratings) == 0:
return content_recs
# Analyze user preferences
preferences = {}
if len(user_profile) > 0 and user_profile.iloc[0]['preferences']:
try:
preferences = json.loads(user_profile.iloc[0]['preferences'])
except:
preferences = {}
if len(user_ratings) > 0:
# Extract preferred resource types
type_ratings = user_ratings.groupby('resource_type')['rating'].mean()
difficulty_ratings = user_ratings.groupby('difficulty')['rating'].mean()
provider_ratings = user_ratings.groupby('provider')['rating'].mean()
preferences['preferred_types'] = type_ratings[type_ratings > 3.5].index.tolist()
preferences['preferred_difficulty'] = difficulty_ratings.idxmax()
preferences['preferred_providers'] = provider_ratings[provider_ratings > 4.0].index.tolist()
# Get additional resources for this skill
conn = sqlite3.connect('learning_path.db')
all_skill_resources = pd.read_sql(f"""
SELECT r.id, r.title, r.resource_type, r.difficulty, r.rating, r.provider
FROM resources r
JOIN resource_skills rs ON r.id = rs.resource_id
WHERE rs.skill_id = {skill_id} AND rs.relationship_type = 'teaches'
""", conn)
conn.close()
# Filter based on preferences
filtered_resources = all_skill_resources.copy()
if 'preferred_types' in preferences and preferences['preferred_types']:
type_mask = filtered_resources['resource_type'].isin(preferences['preferred_types'])
if type_mask.sum() > 0:
filtered_resources = filtered_resources[type_mask]
if 'preferred_providers' in preferences and preferences['preferred_providers']:
provider_mask = filtered_resources['provider'].isin(preferences['preferred_providers'])
if provider_mask.sum() > 0:
filtered_resources = filtered_resources[provider_mask]
if 'preferred_difficulty' in preferences:
diff = preferences['preferred_difficulty']
# Prefer resources with difficulty close to preferred level
filtered_resources['diff_distance'] = abs(filtered_resources['difficulty'] - diff)
filtered_resources = filtered_resources.sort_values('diff_distance')
# Combine with content-based recommendations
content_rec_ids = [r['id'] for r in content_recs]
additional_recs = filtered_resources[~filtered_resources['id'].isin(content_rec_ids)]
# Add more recommendations if needed
if len(content_recs) < n and len(additional_recs) > 0:
additional_recs = additional_recs.sort_values('rating', ascending=False)
num_additional = min(n - len(content_recs), len(additional_recs))
for i in range(num_additional):
rec = additional_recs.iloc[i]
content_recs.append({
'id': rec['id'],
'title': rec['title'],
'resource_type': rec['resource_type'],
'difficulty': rec['difficulty'],
'rating': rec['rating']
})
return content_recs
class LearningPathGenerator:
"""Main class for generating personalized learning paths."""
def __init__(self):
self.skill_graph = SkillGraph()
self.recommender = ResourceRecommender()
def assess_user_skills(self, user_id):
"""Get current user skill levels from the database."""
conn = sqlite3.connect('learning_path.db')
user_skills_df = pd.read_sql(f"""
SELECT skill_id, proficiency_level
FROM user_skills
WHERE user_id = {user_id}
""", conn)
conn.close()
user_skills = {}
for _, row in user_skills_df.iterrows():
user_skills[row['skill_id']] = row['proficiency_level']
return user_skills
def get_user_profile(self, user_id):
"""Get user profile with learning preferences."""
conn = sqlite3.connect('learning_path.db')
profile_df = pd.read_sql(f"""
SELECT learning_style, interests, goals, background, preferences
FROM user_profiles
WHERE user_id = {user_id}
""", conn)
conn.close()
if len(profile_df) == 0:
return {}
profile = profile_df.iloc[0].to_dict()
# Parse JSON fields
for field in ['interests', 'goals', 'preferences']:
if profile[field] and isinstance(profile[field], str):
try:
profile[field] = json.loads(profile[field])
except:
profile[field] = {}
return profile
def generate_learning_path(self, user_id, target_skills, name=None, description=None):
"""
Generate a personalized learning path for a user.
Parameters:
- user_id: ID of the user
- target_skills: List of skill IDs the user wants to learn
- name: Optional name for the learning path
- description: Optional description for the learning path
Returns:
- The created learning path with steps
"""
# Get user's current skills
user_skills = self.assess_user_skills(user_id)
# Get user's learning profile
user_profile = self.get_user_profile(user_id)
# Find optimal skill sequence
skill_sequence = self.skill_graph.find_learning_path(user_skills, target_skills)
if not skill_sequence:
return {"error": "Could not generate a learning path for the target skills"}
# Create a new learning path
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
path_name = name or f"Learning Path {datetime.datetime.now().strftime('%Y-%m-%d')}"
path_desc = description or f"Path to learn {', '.join(map(str, target_skills))}"
target_skills_json = json.dumps(target_skills)
cursor.execute("""
INSERT INTO learning_paths (user_id, name, description, goal_skills)
VALUES (?, ?, ?, ?)
""", (user_id, path_name, path_desc, target_skills_json))
path_id = cursor.lastrowid
# Generate resource recommendations for each skill
step_number = 1
path_steps = []
for skill_id in skill_sequence:
# Get skill info
cursor.execute("SELECT name FROM skills WHERE id = ?", (skill_id,))
skill_name = cursor.fetchone()[0]
# Get recommended resources
resources = self.recommender.recommend_for_skill(skill_id, user_id, n=2)
if resources:
for resource in resources:
# Add as a path step
cursor.execute("""
INSERT INTO path_steps (path_id, step_number, resource_id, status)
VALUES (?, ?, ?, 'not_started')
""", (path_id, step_number, resource['id']))
# Get resource details
cursor.execute("""
SELECT title, resource_type, difficulty, duration, url
FROM resources
WHERE id = ?
""", (resource['id'],))
resource_details = cursor.fetchone()
title, resource_type, difficulty, duration, url = resource_details
path_steps.append({
'step_number': step_number,
'skill_id': skill_id,
'skill_name': skill_name,
'resource_id': resource['id'],
'resource_title': title,
'resource_type': resource_type,
'difficulty': difficulty,
'duration': duration,
'url': url,
'status': 'not_started'
})
step_number += 1
conn.commit()
conn.close()
return {
'path_id': path_id,
'name': path_name,
'description': path_desc,
'target_skills': target_skills,
'steps': path_steps
}
def recommend_learning_groups(self, user_id, skill_id=None):
"""Recommend learning groups for a user based on skills and interests."""
conn = sqlite3.connect('learning_path.db')
# Get all learning groups
groups_df = pd.read_sql("""
SELECT lg.id, lg.name, lg.description, lg.skill_focus, COUNT(gm.user_id) as member_count
FROM learning_groups lg
LEFT JOIN group_members gm ON lg.id = gm.group_id
GROUP BY lg.id
""", conn)
# If no groups exist, return empty list
if len(groups_df) == 0:
conn.close()
return []
# If a specific skill is provided, filter for that skill
if skill_id:
skill_focused_groups = groups_df[groups_df['skill_focus'].apply(
lambda x: str(skill_id) in x.split(',') if x else False
)]
if len(skill_focused_groups) > 0:
recommended_groups = skill_focused_groups.sort_values('member_count', ascending=False).head(3)
conn.close()
return recommended_groups.to_dict('records')
# Get user's interests and skills
user_profile = self.get_user_profile(user_id)
user_skills = self.assess_user_skills(user_id)
# Get user's learning paths
paths_df = pd.read_sql(f"""
SELECT goal_skills
FROM learning_paths
WHERE user_id = {user_id}
""", conn)
conn.close()
# Extract skills of interest
target_skills = []
if 'interests' in user_profile and user_profile['interests']:
# Check if interests include skills
interests = user_profile['interests']
if isinstance(interests, list):
# Assume these are skill names or IDs
target_skills.extend(interests)
if len(paths_df) > 0:
# Extract target skills from learning paths
for _, path in paths_df.iterrows():
if path['goal_skills']:
try:
goals = json.loads(path['goal_skills'])
if isinstance(goals, list):
target_skills.extend(goals)
except:
pass
# If we have target skills, recommend groups based on those
if target_skills:
# Convert to set to remove duplicates
target_skills = set(target_skills)
# Filter groups by skill focus
skill_matches = []
for _, group in groups_df.iterrows():
if not group['skill_focus']:
continue
group_skills = set(str(group['skill_focus']).split(','))
match_score = len(target_skills.intersection(group_skills))
if match_score > 0:
skill_matches.append((group, match_score))
if skill_matches:
# Sort by match score and then by member count
skill_matches.sort(key=lambda x: (-x[1], -x[0]['member_count']))
recommended_groups = [match[0] for match in skill_matches[:3]]
return recommended_groups
# If no matches or no target skills, return most active groups
return groups_df.sort_values('member_count', ascending=False).head(3).to_dict('records')
def update_learning_path(self, path_id, updates):
"""Update a learning path based on user progress and feedback."""
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Get current path information
cursor.execute("""
SELECT lp.user_id, lp.goal_skills, ps.id, ps.resource_id, ps.status
FROM learning_paths lp
JOIN path_steps ps ON lp.id = ps.path_id
WHERE lp.id = ?
""", (path_id,))
path_data = cursor.fetchall()
if not path_data:
conn.close()
return {"error": "Learning path not found"}
user_id = path_data[0][0]
goal_skills = json.loads(path_data[0][1]) if path_data[0][1] else []
# Update step statuses
for step_update in updates.get('step_updates', []):
step_id = step_update.get('step_id')
new_status = step_update.get('status')
if step_id and new_status:
cursor.execute("""
UPDATE path_steps
SET status = ?, completed_at = ?
WHERE id = ? AND path_id = ?
""", (
new_status,
datetime.datetime.now() if new_status == 'completed' else None,
step_id, path_id
))
# If user completed resources, update their skill proficiency
completed_resources = [u['step_id'] for u in updates.get('step_updates', [])
if u.get('status') == 'completed']
if completed_resources:
# Get resource-skill relationships for completed resources
placeholders = ','.join(['?'] * len(completed_resources))
cursor.execute(f"""
SELECT rs.skill_id, rs.weight, ps.resource_id
FROM path_steps ps
JOIN resource_skills rs ON ps.resource_id = rs.resource_id
WHERE ps.id IN ({placeholders}) AND rs.relationship_type = 'teaches'
""", completed_resources)
skill_progress = {}
for skill_id, weight, resource_id in cursor.fetchall():
if skill_id not in skill_progress:
skill_progress[skill_id] = 0
# Add weighted contribution to skill proficiency
skill_progress[skill_id] += float(weight) * 0.2 # Each resource contributes up to 20% progress
# Update user skills
for skill_id, progress in skill_progress.items():
# Get current proficiency
cursor.execute("""
SELECT proficiency_level
FROM user_skills
WHERE user_id = ? AND skill_id = ?
""", (user_id, skill_id))
result = cursor.fetchone()
current_proficiency = result[0] if result else 0
# Calculate new proficiency (capped at 1.0)
new_proficiency = min(1.0, current_proficiency + progress)
# Update or insert
if result:
cursor.execute("""
UPDATE user_skills
SET proficiency_level = ?, assessment_date = ?
WHERE user_id = ? AND skill_id = ?
""", (new_proficiency, datetime.datetime.now(), user_id, skill_id))
else:
cursor.execute("""
INSERT INTO user_skills (user_id, skill_id, proficiency_level, assessment_date)
VALUES (?, ?, ?, ?)
""", (user_id, skill_id, new_proficiency, datetime.datetime.now()))
# Check if user has achieved their goals
if goal_skills:
# Get user's current skills
cursor.execute("""
SELECT skill_id, proficiency_level
FROM user_skills
WHERE user_id = ? AND skill_id IN ({})
""".format(','.join(['?'] * len(goal_skills))), [user_id] + goal_skills)
achieved_skills = {}
for skill_id, proficiency in cursor.fetchall():
achieved_skills[skill_id] = proficiency >= 0.7 # Skill is considered achieved if proficiency >= 70%
# Check if all goals are achieved
all_achieved = all(achieved_skills.get(skill_id, False) for skill_id in goal_skills)
if all_achieved:
# Mark path as completed
cursor.execute("""
UPDATE learning_paths
SET completed_at = ?
WHERE id = ?
""", (datetime.datetime.now(), path_id))
# Suggest a new path for skills that build on the achieved ones
next_level_skills = []
for skill_id in goal_skills:
next_skills = self.skill_graph.get_dependent_skills(skill_id)
for next_skill in next_skills:
# Check if user already mastered this skill
cursor.execute("""
SELECT proficiency_level
FROM user_skills
WHERE user_id = ? AND skill_id = ?
""", (user_id, next_skill))
result = cursor.fetchone()
if not result or result[0] < 0.7:
next_level_skills.append(next_skill)
# Remove duplicates
next_level_skills = list(set(next_level_skills))
conn.commit()
conn.close()
if next_level_skills:
# Generate a suggested next path
suggested_path = self.generate_learning_path(
user_id=user_id,
target_skills=next_level_skills[:3], # Take up to 3 skills
name="Next Level Path",
description="Suggested path to advance your skills"
)
return {
'message': 'Learning path completed successfully',
'all_goals_achieved': True,
'suggested_next_path': suggested_path
}
return {
'message': 'Learning path completed successfully',
'all_goals_achieved': True
}
conn.commit()
conn.close()
return {
'message': 'Learning path updated successfully',
'all_goals_achieved': False
}
# API Routes
@app.route('/api/login', methods=['POST'])
def login():
data = request.json
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Get user
cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
conn.close()
if not user or user[1] != hashlib.sha256(password.encode()).hexdigest():
return jsonify({'error': 'Invalid username or password'}), 401
session['user_id'] = user[0]
return jsonify({'message': 'Login successful', 'user_id': user[0]})
@app.route('/api/register', methods=['POST'])
def register():
data = request.json
username = data.get('username')
email = data.get('email')
password = data.get('password')
if not username or not email or not password:
return jsonify({'error': 'All fields required'}), 400
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Check if user exists
cursor.execute("SELECT id FROM users WHERE username = ? OR email = ?", (username, email))
if cursor.fetchone():
conn.close()
return jsonify({'error': 'Username or email already exists'}), 400
# Create user
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute("""
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
""", (username, email, password_hash))
user_id = cursor.lastrowid
conn.commit()
conn.close()
session['user_id'] = user_id
return jsonify({'message': 'Registration successful', 'user_id': user_id})
@app.route('/api/profile', methods=['GET', 'POST'])
def user_profile():
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
if request.method == 'GET':
conn = sqlite3.connect('learning_path.db')
# Get user info
cursor = conn.cursor()
cursor.execute("SELECT username, email FROM users WHERE id = ?", (user_id,))
user_data = cursor.fetchone()
if not user_data:
conn.close()
return jsonify({'error': 'User not found'}), 404
# Get profile
cursor.execute("""
SELECT learning_style, interests, goals, background, preferences
FROM user_profiles
WHERE user_id = ?
""", (user_id,))
profile_data = cursor.fetchone()
# Get skills
cursor.execute("""
SELECT us.skill_id, s.name, us.proficiency_level
FROM user_skills us
JOIN skills s ON us.skill_id = s.id
WHERE us.user_id = ?
""", (user_id,))
skills_data = cursor.fetchall()
conn.close()
# Format response
profile = {
'username': user_data[0],
'email': user_data[1],
'profile': {
'learning_style': profile_data[0] if profile_data else None,
'interests': json.loads(profile_data[1]) if profile_data and profile_data[1] else [],
'goals': json.loads(profile_data[2]) if profile_data and profile_data[2] else [],
'background': profile_data[3] if profile_data else None,
'preferences': json.loads(profile_data[4]) if profile_data and profile_data[4] else {}
},
'skills': [
{'id': s[0], 'name': s[1], 'proficiency': s[2]}
for s in skills_data
]
}
return jsonify(profile)
elif request.method == 'POST':
data = request.json
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Check if profile exists
cursor.execute("SELECT user_id FROM user_profiles WHERE user_id = ?", (user_id,))
profile_exists = cursor.fetchone() is not None
# Prepare profile data
learning_style = data.get('learning_style')
interests = json.dumps(data.get('interests', []))
goals = json.dumps(data.get('goals', []))
background = data.get('background')
preferences = json.dumps(data.get('preferences', {}))
# Update or insert profile
if profile_exists:
cursor.execute("""
UPDATE user_profiles
SET learning_style = ?, interests = ?, goals = ?, background = ?, preferences = ?, last_updated = ?
WHERE user_id = ?
""", (learning_style, interests, goals, background, preferences, datetime.datetime.now(), user_id))
else:
cursor.execute("""
INSERT INTO user_profiles (user_id, learning_style, interests, goals, background, preferences, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (user_id, learning_style, interests, goals, background, preferences, datetime.datetime.now()))
# Handle skills update if provided
if 'skills' in data:
skills = data['skills']
for skill in skills:
skill_id = skill.get('id')
proficiency = skill.get('proficiency', 0)
cursor.execute("""
INSERT OR REPLACE INTO user_skills (user_id, skill_id, proficiency_level, assessment_date)
VALUES (?, ?, ?, ?)
""", (user_id, skill_id, proficiency, datetime.datetime.now()))
conn.commit()
conn.close()
return jsonify({'message': 'Profile updated successfully'})
@app.route('/api/skills', methods=['GET'])
def get_skills():
conn = sqlite3.connect('learning_path.db')
skills_df = pd.read_sql("""
SELECT id, name, category, description, difficulty
FROM skills
ORDER BY category, difficulty, name
""", conn)
conn.close()
# Group by category
grouped_skills = {}
for _, skill in skills_df.iterrows():
category = skill['category']
if category not in grouped_skills:
grouped_skills[category] = []
grouped_skills[category].append({
'id': skill['id'],
'name': skill['name'],
'description': skill['description'],
'difficulty': skill['difficulty']
})
return jsonify(grouped_skills)
@app.route('/api/skills/<int:skill_id>/prerequisites', methods=['GET'])
def get_skill_prerequisites(skill_id):
# Create skill graph
skill_graph = SkillGraph()
# Get prerequisites
prereqs = skill_graph.get_prerequisites(skill_id)
# Get skill details
conn = sqlite3.connect('learning_path.db')
if prereqs:
prereqs_str = ','.join('?' * len(prereqs))
prereqs_df = pd.read_sql(f"""
SELECT id, name, category, difficulty
FROM skills
WHERE id IN ({prereqs_str})
""", conn, params=prereqs)
prereqs_data = prereqs_df.to_dict('records')
else:
prereqs_data = []
# Get skill information
skill_df = pd.read_sql("""
SELECT id, name, category, description, difficulty
FROM skills
WHERE id = ?
""", conn, params=(skill_id,))
conn.close()
if len(skill_df) == 0:
return jsonify({'error': 'Skill not found'}), 404
skill_data = skill_df.iloc[0].to_dict()
skill_data['prerequisites'] = prereqs_data
return jsonify(skill_data)
@app.route('/api/learning-paths', methods=['GET', 'POST'])
def learning_paths():
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
if request.method == 'GET':
conn = sqlite3.connect('learning_path.db')
paths_df = pd.read_sql(f"""
SELECT id, name, description, goal_skills, created_at, updated_at
FROM learning_paths
WHERE user_id = {user_id}
ORDER BY updated_at DESC
""", conn)
# Get completion stats for each path
paths_data = []
for _, path in paths_df.iterrows():
path_id = path['id']
# Get steps count and completion
steps_df = pd.read_sql(f"""
SELECT status, COUNT(*) as count
FROM path_steps
WHERE path_id = {path_id}
GROUP BY status
""", conn)
total_steps = steps_df['count'].sum()
completed_steps = steps_df[steps_df['status'] == 'completed']['count'].sum() if 'completed' in steps_df['status'].values else 0
# Parse goal skills
goal_skills = []
if path['goal_skills']:
try:
skill_ids = json.loads(path['goal_skills'])
# Get skill names
if skill_ids:
skills_str = ','.join('?' * len(skill_ids))
skills_df = pd.read_sql(f"""
SELECT id, name
FROM skills
WHERE id IN ({skills_str})
""", conn, params=skill_ids)
goal_skills = skills_df.to_dict('records')
except:
goal_skills = []
paths_data.append({
'id': path['id'],
'name': path['name'],
'description': path['description'],
'goal_skills': goal_skills,
'created_at': path['created_at'],
'updated_at': path['updated_at'],
'total_steps': total_steps,
'completed_steps': completed_steps,
'progress': (completed_steps / total_steps * 100) if total_steps > 0 else 0
})
conn.close()
return jsonify(paths_data)
elif request.method == 'POST':
data = request.json
target_skills = data.get('target_skills', [])
name = data.get('name')
description = data.get('description')
if not target_skills:
return jsonify({'error': 'Target skills required'}), 400
# Generate learning path
generator = LearningPathGenerator()
path = generator.generate_learning_path(user_id, target_skills, name, description)
return jsonify(path)
@app.route('/api/learning-paths/<int:path_id>', methods=['GET', 'PUT'])
def learning_path_detail(path_id):
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
conn = sqlite3.connect('learning_path.db')
# Check if path exists and belongs to user
cursor = conn.cursor()
cursor.execute("""
SELECT id
FROM learning_paths
WHERE id = ? AND user_id = ?
""", (path_id, user_id))
path_exists = cursor.fetchone() is not None
if not path_exists:
conn.close()
return jsonify({'error': 'Learning path not found'}), 404
if request.method == 'GET':
# Get path details
path_df = pd.read_sql(f"""
SELECT id, name, description, goal_skills, created_at, updated_at
FROM learning_paths
WHERE id = {path_id}
""", conn)
# Get steps
steps_df = pd.read_sql(f"""
SELECT ps.id, ps.step_number, ps.resource_id, ps.status, ps.completed_at,
r.title, r.description, r.resource_type, r.difficulty, r.duration, r.url, r.provider,
rs.skill_id
FROM path_steps ps
JOIN resources r ON ps.resource_id = r.id
LEFT JOIN resource_skills rs ON r.id = rs.resource_id AND rs.relationship_type = 'teaches'
WHERE ps.path_id = {path_id}
ORDER BY ps.step_number
""", conn)
# Get skill information for goal skills and step skills
goal_skills = json.loads(path_df.iloc[0]['goal_skills']) if path_df.iloc[0]['goal_skills'] else []
step_skills = steps_df['skill_id'].dropna().unique().tolist()
all_skills = list(set(goal_skills + step_skills))
if all_skills:
skills_str = ','.join('?' * len(all_skills))
skills_df = pd.read_sql(f"""
SELECT id, name, category
FROM skills
WHERE id IN ({skills_str})
""", conn, params=all_skills)
# Create skill lookup
skill_lookup = {row['id']: row.to_dict() for _, row in skills_df.iterrows()}
else:
skill_lookup = {}
# Format steps with skill info
steps_data = []
for _, step in steps_df.iterrows():
skill_id = step['skill_id']
skill_info = skill_lookup.get(skill_id, {}) if skill_id else {}
steps_data.append({
'id': step['id'],
'step_number': step['step_number'],
'resource': {
'id': step['resource_id'],
'title': step['title'],
'description': step['description'],
'type': step['resource_type'],
'difficulty': step['difficulty'],
'duration': step['duration'],
'url': step['url'],
'provider': step['provider']
},
'skill': skill_info,
'status': step['status'],
'completed_at': step['completed_at']
})
# Format goal skills
goal_skills_data = [skill_lookup.get(skill_id, {'id': skill_id}) for skill_id in goal_skills]
path_data = {
'id': path_df.iloc[0]['id'],
'name': path_df.iloc[0]['name'],
'description': path_df.iloc[0]['description'],
'goal_skills': goal_skills_data,
'created_at': path_df.iloc[0]['created_at'],
'updated_at': path_df.iloc[0]['updated_at'],
'steps': steps_data
}
conn.close()
return jsonify(path_data)
elif request.method == 'PUT':
data = request.json
# Update learning path
generator = LearningPathGenerator()
result = generator.update_learning_path(path_id, data)
return jsonify(result)
@app.route('/api/resources/<int:resource_id>/rate', methods=['POST'])
def rate_resource(resource_id):
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
data = request.json
rating = data.get('rating')
review = data.get('review', '')
if not rating or not isinstance(rating, (int, float)) or rating < 1 or rating > 5:
return jsonify({'error': 'Invalid rating'}), 400
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Check if resource exists
cursor.execute("SELECT id FROM resources WHERE id = ?", (resource_id,))
if not cursor.fetchone():
conn.close()
return jsonify({'error': 'Resource not found'}), 404
# Add or update rating
cursor.execute("""
INSERT OR REPLACE INTO resource_ratings (user_id, resource_id, rating, review, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (user_id, resource_id, rating, review, datetime.datetime.now()))
# Update average rating in resources table
cursor.execute("""
UPDATE resources
SET rating = (
SELECT AVG(rating)
FROM resource_ratings
WHERE resource_id = ?
)
WHERE id = ?
""", (resource_id, resource_id))
conn.commit()
conn.close()
return jsonify({'message': 'Rating submitted successfully'})
@app.route('/api/learning-groups/join/<int:group_id>', methods=['POST'])
def join_group(group_id):
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
# Check if group exists
cursor.execute("SELECT id FROM learning_groups WHERE id = ?", (group_id,))
if not cursor.fetchone():
conn.close()
return jsonify({'error': 'Group not found'}), 404
# Check if user is already a member
cursor.execute("""
SELECT group_id
FROM group_members
WHERE group_id = ? AND user_id = ?
""", (group_id, user_id))
if cursor.fetchone():
conn.close()
return jsonify({'error': 'Already a member of this group'}), 400
# Add user to group
cursor.execute("""
INSERT INTO group_members (group_id, user_id)
VALUES (?, ?)
""", (group_id, user_id))
conn.commit()
conn.close()
return jsonify({'message': 'Successfully joined the group'})
@app.route('/api/skill-assessment', methods=['POST'])
def assess_skills():
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
data = request.json
if not data or not isinstance(data, list):
return jsonify({'error': 'Invalid data format'}), 400
conn = sqlite3.connect('learning_path.db')
cursor = conn.cursor()
for assessment in data:
skill_id = assessment.get('skill_id')
proficiency = assessment.get('proficiency')
if not skill_id or not isinstance(proficiency, (int, float)):
continue
# Normalize proficiency to 0-1 range
proficiency = max(0, min(1, proficiency))
# Update or insert skill assessment
cursor.execute("""
INSERT OR REPLACE INTO user_skills (user_id, skill_id, proficiency_level, assessment_date)
VALUES (?, ?, ?, ?)
""", (user_id, skill_id, proficiency, datetime.datetime.now()))
conn.commit()
conn.close()
return jsonify({'message': 'Skill assessment saved successfully'})
@app.route('/api/recommendations/skills', methods=['GET'])
def recommend_skills():
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
# Get user's current skills
conn = sqlite3.connect('learning_path.db')
user_skills_df = pd.read_sql(f"""
SELECT skill_id, proficiency_level
FROM user_skills
WHERE user_id = {user_id}
""", conn)
# If user has no skills yet, recommend beginner skills
if len(user_skills_df) == 0:
beginner_skills = pd.read_sql("""
SELECT id, name, category, description
FROM skills
WHERE difficulty = 1
LIMIT 5
""", conn)
conn.close()
return jsonify({
'beginner_recommendations': beginner_skills.to_dict('records'),
'next_level_recommendations': [],
'based_on_interests': []
})
# Create skill graph
skill_graph = SkillGraph()
# Find mastered skills (proficiency > 0.7)
mastered_skills = user_skills_df[user_skills_df['proficiency_level'] > 0.7]['skill_id'].tolist()
# Get next level skills based on mastered skills
next_level_skills = []
for skill_id in mastered_skills:
dependents = skill_graph.get_dependent_skills(skill_id)
next_level_skills.extend(dependents)
# Remove duplicates and already mastered skills
next_level_skills = list(set(next_level_skills) - set(mastered_skills))
# Get user's learning profile and interests
user_profile = pd.read_sql(f"""
SELECT interests
FROM user_profiles
WHERE user_id = {user_id}
""", conn)
interest_recommendations = []
if len(user_profile) > 0 and user_profile.iloc[0]['interests']:
try:
interests = json.loads(user_profile.iloc[0]['interests'])
# Query skills related to interests
skills_df = pd.read_sql("""
SELECT id, name, category, description
FROM skills
""", conn)
# Simple matching based on keywords in skill names and descriptions
if interests and isinstance(interests, list):
for interest in interests:
interest_lower = interest.lower()
for _, skill in skills_df.iterrows():
name_match = interest_lower in skill['name'].lower()
desc_match = skill['description'] and interest_lower in skill['description'].lower()
category_match = skill['category'] and interest_lower in skill['category'].lower()
if (name_match or desc_match or category_match) and skill['id'] not in mastered_skills:
interest_recommendations.append(skill['id'])
except:
pass
# Get skill details
if next_level_skills:
next_skills_str = ','.join('?' * len(next_level_skills))
next_level_df = pd.read_sql(f"""
SELECT id, name, category, description
FROM skills
WHERE id IN ({next_skills_str})
""", conn, params=next_level_skills)
next_level_data = next_level_df.to_dict('records')
else:
next_level_data = []
if interest_recommendations:
interest_skills_str = ','.join('?' * len(interest_recommendations))
interest_df = pd.read_sql(f"""
SELECT id, name, category, description
FROM skills
WHERE id IN ({interest_skills_str})
""", conn, params=interest_recommendations)
interest_data = interest_df.to_dict('records')
else:
interest_data = []
# Get some beginner-friendly skills that user hasn't mastered yet
all_recommended = set(next_level_skills + interest_recommendations + mastered_skills)
beginner_df = pd.read_sql(f"""
SELECT id, name, category, description
FROM skills
WHERE difficulty = 1 AND id NOT IN ({','.join('?' * len(all_recommended))})
LIMIT 3
""", conn, params=list(all_recommended)) if all_recommended else pd.read_sql("""
SELECT id, name, category, description
FROM skills
WHERE difficulty = 1
LIMIT 3
""", conn)
conn.close()
return jsonify({
'beginner_recommendations': beginner_df.to_dict('records'),
'next_level_recommendations': next_level_data[:5], # Limit to 5
'based_on_interests': interest_data[:5] # Limit to 5
})
@app.route('/api/recommendations/learning-groups', methods=['GET'])
def recommend_groups():
if 'user_id' not in session:
return jsonify({'error': 'Not logged in'}), 401
user_id = session['user_id']
# Get skill_id from query params (optional)
skill_id = request.args.get('skill_id', type=int)
# Get group recommendations
generator = LearningPathGenerator()
groups = generator.recommend_learning_groups(user_id, skill_id)
return jsonify({'recommended_groups': groups})
if __name__ == '__main__':
app.run(debug=True)
Resources:
10. Environmental Monitoring System with IoT Integration

Difficulty Level: Intermediate to Advanced
Skills Developed: IoT, Sensor Programming, Data Analysis, Cloud Integration, Visualization
Project Description: Create an environmental monitoring system that collects data from distributed sensors (temperature, humidity, air quality, noise levels) across a college campus. This system analyzes patterns, identifies anomalies, and provides insights to improve campus sustainability and student well-being.
Key Features:
- Sensor data collection and storage
- Real-time monitoring dashboard
- Anomaly detection algorithms
- Environmental trend analysis
- Notification system for threshold violations
- API integration for weather data correlation
Implementation Steps:
- Set up sensors (or simulate sensor data)
- Create a data collection backend with Flask or Django
- Implement a database for storing sensor readings
- Develop analysis algorithms for pattern detection
- Build visualization dashboards using Plotly Dash
- Implement notification systems using webhooks or SMS
Sample Code Implementation:
pythonCopy# environmental_monitoring.py
# Create sensors table
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import sqlite3
import datetime
import time
import json
import os
import requests
import smtplib
from email.message import EmailMessage
import threading
import logging
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import random
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("environment_monitor.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Environment data model
class EnvironmentDatabase:
def __init__(self, db_path='environment_data.db'):
self.db_path = db_path
self.init_db()
def init_db(self):
"""Initialize the SQLite database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create sensor locations table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_locations (
id INTEGER PRIMARY KEY,
location_name TEXT NOT NULL,
building TEXT,
floor INTEGER,
room TEXT,
latitude REAL,
longitude REAL,
indoor BOOLEAN,
notes TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensors (
id INTEGER PRIMARY KEY,
sensor_type TEXT NOT NULL,
model TEXT,
manufacturer TEXT,
install_date DATE,
last_maintenance DATE,
location_id INTEGER,
status TEXT DEFAULT 'active',
FOREIGN KEY (location_id) REFERENCES sensor_locations (id)
)
''')
# Create sensor readings table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY,
sensor_id INTEGER,
timestamp DATETIME NOT NULL,
temperature REAL,
humidity REAL,
air_quality_index REAL,
co2_level REAL,
noise_level REAL,
light_level REAL,
pressure REAL,
particulate_matter REAL,
voc_level REAL,
battery_level REAL,
FOREIGN KEY (sensor_id) REFERENCES sensors (id)
)
''')
# Create alerts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY,
sensor_id INTEGER,
timestamp DATETIME NOT NULL,
alert_type TEXT NOT NULL,
measurement_type TEXT NOT NULL,
measurement_value REAL,
threshold_value REAL,
message TEXT,
acknowledged BOOLEAN DEFAULT 0,
FOREIGN KEY (sensor_id) REFERENCES sensors (id)
)
''')
# Create thresholds table
cursor.execute('''
CREATE TABLE IF NOT EXISTS thresholds (
id INTEGER PRIMARY KEY,
measurement_type TEXT NOT NULL,
min_value REAL,
max_value REAL,
location_id INTEGER,
sensor_id INTEGER,
FOREIGN KEY (location_id) REFERENCES sensor_locations (id),
FOREIGN KEY (sensor_id) REFERENCES sensors (id)
)
''')
# Create external weather data table
cursor.execute('''
CREATE TABLE IF NOT EXISTS external_weather (
id INTEGER PRIMARY KEY,
timestamp DATETIME NOT NULL,
temperature REAL,
humidity REAL,
pressure REAL,
wind_speed REAL,
wind_direction TEXT,
precipitation REAL,
weather_condition TEXT,
source TEXT
)
''')
conn.commit()
conn.close()
self.load_sample_data()
def load_sample_data(self):
"""Load sample data if the database is empty."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if we already have data
cursor.execute("SELECT COUNT(*) FROM sensor_locations")
if cursor.fetchone()[0] > 0:
conn.close()
return
# Sample sensor locations
locations = [
(1, 'Science Building Lobby', 'Science Building', 1, 'Lobby', 40.7128, -74.0060, True, 'Main entrance area'),
(2, 'Science Building Lab 101', 'Science Building', 1, '101', 40.7128, -74.0061, True, 'Chemistry lab'),
(3, 'Science Building Roof', 'Science Building', 4, 'Roof', 40.7129, -74.0060, False, 'Outdoor sensor'),
(4, 'Library Main Hall', 'Library', 1, 'Main Hall', 40.7135, -74.0070, True, 'Reading area'),
(5, 'Student Center Cafeteria', 'Student Center', 1, 'Cafeteria', 40.7140, -74.0075, True, 'Dining area'),
(6, 'Engineering Building Room 201', 'Engineering Building', 2, '201', 40.7145, -74.0080, True, 'Computer lab'),
(7, 'Dormitory A Common Room', 'Dormitory A', 1, 'Common Room', 40.7150, -74.0090, True, 'Student lounge'),
(8, 'Sports Field', 'Outdoor', 0, 'Field', 40.7155, -74.0095, False, 'Soccer field')
]
cursor.executemany('''
INSERT INTO sensor_locations (id, location_name, building, floor, room, latitude, longitude, indoor, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', locations)
# Sample sensors
sensors = [
(1, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-01-15', '2023-06-15', 1, 'active'),
(2, 'air_quality', 'PMS5003', 'Plantower', '2023-01-15', '2023-06-15', 1, 'active'),
(3, 'noise', 'SEN-12642', 'SparkFun', '2023-01-15', '2023-06-15', 1, 'active'),
(4, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-01-20', '2023-06-20', 2, 'active'),
(5, 'air_quality', 'PMS5003', 'Plantower', '2023-01-20', '2023-06-20', 2, 'active'),
(6, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-01-25', '2023-06-25', 3, 'active'),
(7, 'air_quality', 'PMS5003', 'Plantower', '2023-01-25', '2023-06-25', 3, 'active'),
(8, 'noise', 'SEN-12642', 'SparkFun', '2023-01-25', '2023-06-25', 3, 'active'),
(9, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-01', '2023-07-01', 4, 'active'),
(10, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-05', '2023-07-05', 5, 'active'),
(11, 'air_quality', 'PMS5003', 'Plantower', '2023-02-05', '2023-07-05', 5, 'active'),
(12, 'noise', 'SEN-12642', 'SparkFun', '2023-02-05', '2023-07-05', 5, 'active'),
(13, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-10', '2023-07-10', 6, 'active'),
(14, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-15', '2023-07-15', 7, 'active'),
(15, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-20', '2023-07-20', 8, 'active'),
(16, 'air_quality', 'PMS5003', 'Plantower', '2023-02-20', '2023-07-20', 8, 'active')
]
cursor.executemany('''
INSERT INTO sensors (id, sensor_type, model, manufacturer, install_date, last_maintenance, location_id, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', sensors)
# Sample thresholds
thresholds = [
(1, 'temperature', 18.0, 28.0, None, None), # Global temperature thresholds
(2, 'humidity', 30.0, 60.0, None, None), # Global humidity thresholds
(3, 'air_quality_index', 0.0, 100.0, None, None), # Global AQI thresholds
(4, 'co2_level', 400.0, 1000.0, None, None), # Global CO2 thresholds
(5, 'noise_level', 40.0, 70.0, None, None), # Global noise thresholds
(6, 'temperature', 20.0, 24.0, 2, None), # Lab-specific temperature thresholds
(7, 'temperature', 22.0, 26.0, 5, None), # Cafeteria-specific temperature thresholds
(8, 'noise_level', 40.0, 60.0, 4, None), # Library-specific noise thresholds
(9, 'air_quality_index', 0.0, 50.0, 6, None) # Computer lab-specific AQI thresholds
]
cursor.executemany('''
INSERT INTO thresholds (id, measurement_type, min_value, max_value, location_id, sensor_id)
VALUES (?, ?, ?, ?, ?, ?)
''', thresholds)
conn.commit()
conn.close()
# Generate sample readings for the past 7 days
self.generate_sample_readings(days=7)
def generate_sample_readings(self, days=7):
"""Generate realistic sample sensor readings for testing."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get all sensors
cursor.execute("SELECT id, sensor_type, location_id FROM sensors")
sensors = cursor.fetchall()
# Current time
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=days)
# Sample at 15-minute intervals
interval_minutes = 15
# Generate readings for each sensor
readings = []
current_time = start_time
while current_time <= end_time:
for sensor_id, sensor_type, location_id in sensors:
# Base values with some daily patterns
hour_of_day = current_time.hour
# Temperature varies throughout the day
temp_base = 22.0 # Base indoor temperature
if location_id in [3, 8]: # Outdoor sensors
# More variation for outdoor sensors
temp_base = 15.0 + 10.0 * np.sin(np.pi * (hour_of_day - 6) / 12) # Peak at noon
else:
# Indoor follows outdoor but with less variation and some lag
temp_base = 21.0 + 2.0 * np.sin(np.pi * (hour_of_day - 8) / 12)
# Add some day-to-day variation
day_offset = (current_time.date() - start_time.date()).days
temp_base += np.sin(day_offset * 0.5) * 3 # Slow weekly pattern
# Add noise
temperature = temp_base + np.random.normal(0, 0.5)
# Humidity inversely related to temperature with some randomness
humidity_base = 80.0 - temperature * 1.5
humidity = max(30, min(95, humidity_base + np.random.normal(0, 5)))
# Air quality worse during busy hours
aqi_base = 50.0
if 8 <= hour_of_day <= 18: # Work hours
aqi_base = 60.0 + (10.0 * np.sin(np.pi * (hour_of_day - 8) / 10))
air_quality_index = max(20, min(150, aqi_base + np.random.normal(0, 8)))
# CO2 follows occupancy patterns
co2_base = 400.0 # Baseline outdoor level
if location_id not in [3, 8]: # Indoor
if 8 <= hour_of_day <= 18: # Work hours
co2_base = 600.0 + (200.0 * np.sin(np.pi * (hour_of_day - 8) / 10))
co2_level = max(400, min(1500, co2_base + np.random.normal(0, 30)))
# Noise level based on location and time
noise_base = 45.0
if location_id == 4: # Library
noise_base = 35.0
elif location_id == 5: # Cafeteria
if 11 <= hour_of_day <= 14 or 17 <= hour_of_day <= 19: # Meal times
noise_base = 65.0
else:
noise_base = 45.0
elif location_id in [3, 8]: # Outdoor
noise_base = 50.0 + (10.0 * np.sin(np.pi * (hour_of_day - 8) / 10))
noise_level = max(30, min(80, noise_base + np.random.normal(0, 5)))
# Light level based on time
light_base = 0.0
if 6 <= hour_of_day <= 18: # Daylight
light_base = 300.0 + (300.0 * np.sin(np.pi * (hour_of_day - 6) / 12))
light_level = max(0, min(800, light_base + np.random.normal(0, 30)))
# Pressure (relatively stable with weather patterns)
pressure_base = 1013.0 # Standard pressure
pressure = pressure_base + day_offset * 0.5 + np.random.normal(0, 1)
# PM2.5 levels correlate with air quality
pm_base = air_quality_index * 0.5
particulate_matter = max(5, min(300, pm_base + np.random.normal(0, 10)))
# VOC levels
voc_base = 400.0
if location_id == 2: # Chemistry lab
voc_base = 600.0
voc_level = max(100, min(1000, voc_base + np.random.normal(0, 50)))
# Battery level (slowly decreases over time, reset at maintenance)
days_since_maintenance = (current_time.date() - datetime.datetime.strptime(
cursor.execute("SELECT last_maintenance FROM sensors WHERE id = ?", (sensor_id,)).fetchone()[0],
"%Y-%m-%d"
).date()).days
battery_level = 100.0 - (days_since_maintenance * 0.5) + np.random.normal(0, 0.1)
battery_level = max(0, min(100, battery_level))
# Initialize with None, then set values based on sensor type
reading = {
'sensor_id': sensor_id,
'timestamp': current_time.isoformat(),
'temperature': None,
'humidity': None,
'air_quality_index': None,
'co2_level': None,
'noise_level': None,
'light_level': None,
'pressure': None,
'particulate_matter': None,
'voc_level': None,
'battery_level': battery_level
}
# Set values based on sensor type
if sensor_type == 'temperature_humidity':
reading['temperature'] = temperature
reading['humidity'] = humidity
reading['pressure'] = pressure
elif sensor_type == 'air_quality':
reading['air_quality_index'] = air_quality_index
reading['co2_level'] = co2_level
reading['particulate_matter'] = particulate_matter
reading['voc_level'] = voc_level
elif sensor_type == 'noise':
reading['noise_level'] = noise_level
reading['light_level'] = light_level
readings.append((
sensor_id,
current_time.isoformat(),
reading['temperature'],
reading['humidity'],
reading['air_quality_index'],
reading['co2_level'],
reading['noise_level'],
reading['light_level'],
reading['pressure'],
reading['particulate_matter'],
reading['voc_level'],
reading['battery_level']
))
# Increment time
current_time += datetime.timedelta(minutes=interval_minutes)
# Insert in batches to avoid SQLite limitations
batch_size = 1000
for i in range(0, len(readings), batch_size):
batch = readings[i:i+batch_size]
cursor.executemany('''
INSERT INTO sensor_readings (
sensor_id, timestamp, temperature, humidity, air_quality_index,
co2_level, noise_level, light_level, pressure, particulate_matter,
voc_level, battery_level
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', batch)
conn.commit()
# Generate some external weather data
weather_data = []
current_time = start_time
while current_time <= end_time:
# Only store at hourly intervals
if current_time.minute == 0:
# Base temperature varies by time of day
hour_of_day = current_time.hour
temp_base = 15.0 + 10.0 * np.sin(np.pi * (hour_of_day - 6) / 12) # Peak at noon
# Add some day-to-day variation
day_offset = (current_time.date() - start_time.date()).days
temp_base += np.sin(day_offset * 0.5) * 3 # Slow weekly pattern
temperature = temp_base + np.random.normal(0, 0.5)
# Humidity inversely related to temperature
humidity = 80.0 - temperature * 1.5 + np.random.normal(0, 5)
humidity = max(30, min(95, humidity))
# Pressure varies slowly with some randomness
pressure = 1013.0 + day_offset * 0.5 + np.random.normal(0, 1)
# Wind speed and direction
wind_speed = max(0, np.random.normal(5, 2))
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
wind_direction = random.choice(directions)
# Precipitation (mostly zero with occasional rain)
precipitation = 0.0
if random.random() < 0.2: # 20% chance of rain
precipitation = random.uniform(0, 10)
# Weather condition
if precipitation > 1.0:
weather_condition = 'rain'
elif humidity > 80:
weather_condition = 'fog'
elif temperature > 25:
weather_condition = 'sunny'
else:
weather_condition = 'cloudy'
weather_data.append((
current_time.isoformat(),
temperature,
humidity,
pressure,
wind_speed,
wind_direction,
precipitation,
weather_condition,
'simulated'
))
current_time += datetime.timedelta(minutes=15)
cursor.executemany('''
INSERT INTO external_weather (
timestamp, temperature, humidity, pressure, wind_speed,
wind_direction, precipitation, weather_condition, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', weather_data)
conn.commit()
conn.close()
def get_sensor_info(self):
"""Get information about all sensors."""
conn = sqlite3.connect(self.db_path)
query = '''
SELECT s.id, s.sensor_type, s.model, s.manufacturer, s.status,
l.id as location_id, l.location_name, l.building, l.floor, l.room, l.indoor
FROM sensors s
JOIN sensor_locations l ON s.location_id = l.id
'''
df = pd.read_sql_query(query, conn)
conn.close()
return df
def get_latest_readings(self):
"""Get the latest reading from each sensor."""
conn = sqlite3.connect(self.db_path)
query = '''
WITH LatestReadings AS (
SELECT sensor_id, MAX(timestamp) as max_time
FROM sensor_readings
GROUP BY sensor_id
)
SELECT sr.*, s.sensor_type, l.location_name, l.building
FROM sensor_readings sr
JOIN LatestReadings lr ON sr.sensor_id = lr.sensor_id AND sr.timestamp = lr.max_time
JOIN sensors s ON sr.sensor_id = s.id
JOIN sensor_locations l ON s.location_id = l.id
'''
df = pd.read_sql_query(query, conn)
conn.close()
return df
def get_readings_for_period(self, start_time, end_time, sensor_ids=None, measurement_types=None):
"""Get sensor readings for a specific time period."""
conn = sqlite3.connect(self.db_path)
params = [start_time, end_time]
query = '''
SELECT sr.*, s.sensor_type, l.location_name, l.building
FROM sensor_readings sr
JOIN sensors s ON sr.sensor_id = s.id
JOIN sensor_locations l ON s.location_id = l.id
WHERE sr.timestamp >= ? AND sr.timestamp <= ?
'''
if sensor_ids:
placeholders = ','.join('?' * len(sensor_ids))
query += f' AND sr.sensor_id IN ({placeholders})'
params.extend(sensor_ids)
query += ' ORDER BY sr.timestamp'
df = pd.read_sql_query(query, conn, params=params)
conn.close()
# Filter by measurement types if specified
if measurement_types and len(df) > 0:
df = df[['timestamp', 'sensor_id', 'sensor_type', 'location_name', 'building'] + measurement_types]
return df
def get_thresholds(self, location_id=None, sensor_id=None):
"""Get threshold settings."""
conn = sqlite3.connect(self.db_path)
query = 'SELECT * FROM thresholds WHERE 1=1'
params = []
if location_id:
query += ' AND (location_id = ? OR location_id IS NULL)'
params.append(location_id)
if sensor_id:
query += ' AND (sensor_id = ? OR sensor_id IS NULL)'
params.append(sensor_id)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
def add_sensor_reading(self, sensor_id, data):
"""Add a new sensor reading to the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check that sensor exists
cursor.execute("SELECT id FROM sensors WHERE id = ?", (sensor_id,))
if not cursor.fetchone():
conn.close()
return False
# Prepare data
timestamp = data.get('timestamp', datetime.datetime.now().isoformat())
cursor.execute('''
INSERT INTO sensor_readings (
sensor_id, timestamp, temperature, humidity, air_quality_index,
co2_level, noise_level, light_level, pressure, particulate_matter,
voc_level, battery_level
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
sensor_id,
timestamp,
data.get('temperature'),
data.get('humidity'),
data.get('air_quality_index'),
data.get('co2_level'),
data.get('noise_level'),
data.get('light_level'),
data.get('pressure'),
data.get('particulate_matter'),
data.get('voc_level'),
data.get('battery_level')
))
conn.commit()
# Check against thresholds
self.check_thresholds(conn, sensor_id, data)
conn.close()
return True
def check_thresholds(self, conn, sensor_id, data):
"""Check if readings exceed thresholds and create alerts."""
cursor = conn.cursor()
# Get sensor location
cursor.execute("SELECT location_id FROM sensors WHERE id = ?", (sensor_id,))
location_id = cursor.fetchone()[0]
# Get applicable thresholds
cursor.execute('''
SELECT measurement_type, min_value, max_value
FROM thresholds
WHERE (location_id = ? OR location_id IS NULL)
AND (sensor_id = ? OR sensor_id IS NULL)
''', (location_id, sensor_id))
thresholds = cursor.fetchall()
# Check each measurement against thresholds
timestamp = data.get('timestamp', datetime.datetime.now().isoformat())
for measurement_type, min_value, max_value in thresholds:
value = data.get(measurement_type)
if value is None:
continue
if min_value is not None and value < min_value:
# Create low threshold alert
cursor.execute('''
INSERT INTO alerts (
sensor_id, timestamp, alert_type, measurement_type,
measurement_value, threshold_value, message
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
sensor_id,
timestamp,
'low',
measurement_type,
value,
min_value,
f"{measurement_type} reading ({value}) below minimum threshold ({min_value})"
))
if max_value is not None and value > max_value:
# Create high threshold alert
cursor.execute('''
INSERT INTO alerts (
sensor_id, timestamp, alert_type, measurement_type,
measurement_value, threshold_value, message
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
sensor_id,
timestamp,
'high',
measurement_type,
value,
max_value,
f"{measurement_type} reading ({value}) above maximum threshold ({max_value})"
))
conn.commit()
def get_alerts(self, start_time=None, end_time=None, acknowledged=None):
"""Get alerts with optional filtering."""
conn = sqlite3.connect(self.db_path)
query = '''
SELECT a.*, s.sensor_type, l.location_name, l.building
FROM alerts a
JOIN sensors s ON a.sensor_id = s.id
JOIN sensor_locations l ON s.location_id = l.id
WHERE 1=1
'''
params = []
if start_time:
query += ' AND a.timestamp >= ?'
params.append(start_time)
if end_time:
query += ' AND a.timestamp <= ?'
params.append(end_time)
if acknowledged is not None:
query += ' AND a.acknowledged = ?'
params.append(1 if acknowledged else 0)
query += ' ORDER BY a.timestamp DESC'
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
def acknowledge_alert(self, alert_id):
"""Mark an alert as acknowledged."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("UPDATE alerts SET acknowledged = 1 WHERE id = ?", (alert_id,))
rows_affected = cursor.rowcount
conn.commit()
conn.close()
return rows_affected > 0
def get_external_weather(self, start_time, end_time):
"""Get external weather data for a specific time period."""
conn = sqlite3.connect(self.db_path)
query = '''
SELECT *
FROM external_weather
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp
'''
df = pd.read_sql_query(query, conn, params=[start_time, end_time])
conn.close()
return df
class DataAnalyzer:
"""Analyze environmental data for patterns and anomalies."""
def __init__(self, db):
self.db = db
def detect_anomalies(self, data, columns, contamination=0.05):
"""Detect anomalies in sensor readings using Isolation Forest."""
if len(data) < 10:
return {}
# Prepare data for anomaly detection
features = data[columns].copy()
# Handle missing values
features = features.fillna(method='ffill').fillna(method='bfill')
if features.isna().any().any():
# If still have NaNs, drop those columns
features = features.dropna(axis=1)
if len(features.columns) == 0 or len(features) < 10:
return {}
# Standardize features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# Train isolation forest
clf = IsolationForest(contamination=contamination, random_state=42)
predictions = clf.fit_predict(scaled_features)
# Anomaly scores
scores = clf.decision_function(scaled_features)
# Identify anomalies
anomalies = {}
for i, pred in enumerate(predictions):
if pred == -1: # Anomaly
timestamp = data.iloc[i]['timestamp']
anomaly_data = {
'score': scores[i],
'values': {col: data.iloc[i][col] for col in columns if not pd.isna(data.iloc[i][col])}
}
anomalies[timestamp] = anomaly_data
return anomalies
def identify_patterns(self, data, column, period='daily'):
"""Identify patterns in sensor data (e.g., daily patterns)."""
if len(data) < 24:
return None
# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_any_dtype(data['timestamp']):
data = data.copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])
# Extract time components
if period == 'daily':
data['hour'] = data['timestamp'].dt.hour
groupby_col = 'hour'
elif period == 'weekly':
data['day_of_week'] = data['timestamp'].dt.dayofweek
groupby_col = 'day_of_week'
elif period == 'monthly':
data['day'] = data['timestamp'].dt.day
groupby_col = 'day'
else:
return None
# Group by time component and calculate statistics
pattern = data.groupby(groupby_col)[column].agg(['mean', 'std', 'min', 'max']).reset_index()
return pattern
def correlate_with_weather(self, sensor_data, weather_data, sensor_column, weather_column):
"""Correlate internal sensor readings with external weather data."""
if len(sensor_data) < 10 or len(weather_data) < 10:
return None
# Convert timestamps to datetime if they're not already
if not pd.api.types.is_datetime64_any_dtype(sensor_data['timestamp']):
sensor_data = sensor_data.copy()
sensor_data['timestamp'] = pd.to_datetime(sensor_data['timestamp'])
if not pd.api.types.is_datetime64_any_dtype(weather_data['timestamp']):
weather_data = weather_data.copy()
weather_data['timestamp'] = pd.to_datetime(weather_data['timestamp'])
# Merge data on closest timestamp
sensor_data['timestamp_key'] = sensor_data['timestamp'].dt.floor('H')
weather_data['timestamp_key'] = weather_data['timestamp'].dt.floor('H')
merged = pd.merge_asof(
sensor_data.sort_values('timestamp'),
weather_data.sort_values('timestamp')[['timestamp', weather_column, 'timestamp_key']],
left_on='timestamp',
right_on='timestamp',
direction='nearest',
tolerance=pd.Timedelta('1H')
)
# Drop rows with missing values
valid_data = merged.dropna(subset=[sensor_column, weather_column])
if len(valid_data) < 10:
return None
# Calculate correlation
correlation, p_value = stats.pearsonr(valid_data[sensor_column], valid_data[weather_column])
return {
'correlation': correlation,
'p_value': p_value,
'significant': p_value < 0.05,
'sample_size': len(valid_data)
}
def calculate_statistics(self, data, columns, groupby=None):
"""Calculate summary statistics for sensor readings."""
if len(data) == 0:
return {}
stats = {}
if groupby:
# Group by specified column(s)
grouped = data.groupby(groupby)
for col in columns:
if col in data.columns:
stats[col] = {
'by_group': grouped[col].agg(['count', 'mean', 'std', 'min', 'max']).to_dict()
}
else:
# Overall statistics
for col in columns:
if col in data.columns:
stats[col] = {
'count': data[col].count(),
'mean': data[col].mean(),
'std': data[col].std(),
'min': data[col].min(),
'max': data[col].max(),
'median': data[col].median(),
'q1': data[col].quantile(0.25),
'q3': data[col].quantile(0.75)
}
return stats
def forecast_trends(self, data, column, periods=24):
"""Simple forecasting using historical patterns."""
if len(data) < 48:
return None
# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_any_dtype(data['timestamp']):
data = data.copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])
# Resample to hourly data
hourly_data = data.set_index('timestamp')[column].resample('H').mean()
# Fill missing values
hourly_data = hourly_data.fillna(method='ffill').fillna(method='bfill')
if hourly_data.isna().any():
return None
# Determine seasonality (24 hours for daily pattern)
seasonality = 24
# Simple forecasting using seasonal mean
last_period = hourly_data.iloc[-seasonality:].reset_index(drop=True)
forecast = []
for i in range(periods):
# Use the corresponding hour from the last observed period
forecast_value = last_period[i % len(last_period)]
forecast.append(forecast_value)
# Generate forecast timestamps
last_timestamp = hourly_data.index[-1]
forecast_timestamps = [last_timestamp + datetime.timedelta(hours=i+1) for i in range(periods)]
forecast_df = pd.DataFrame({
'timestamp': forecast_timestamps,
f'forecast_{column}': forecast
})
return forecast_df
class NotificationManager:
"""Manage alerts and notifications for threshold violations."""
def __init__(self, email_config=None, webhook_url=None):
self.email_config = email_config or {}
self.webhook_url = webhook_url
def send_email_notification(self, subject, message):
"""Send an email notification."""
if not self.email_config:
logger.warning("Email configuration not provided, skipping email notification")
return False
try:
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = subject
msg['From'] = self.email_config.get('sender', 'noreply@example.com')
msg['To'] = self.email_config.get('recipient', 'admin@example.com')
with smtplib.SMTP(self.email_config.get('server', 'localhost'),
self.email_config.get('port', 25)) as server:
if self.email_config.get('use_tls', False):
server.starttls()
if 'username' in self.email_config and 'password' in self.email_config:
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
logger.info(f"Email notification sent: {subject}")
return True
except Exception as e:
logger.error(f"Failed to send email notification: {str(e)}")
return False
def send_webhook_notification(self, payload):
"""Send a notification to a webhook."""
if not self.webhook_url:
logger.warning("Webhook URL not provided, skipping webhook notification")
return False
try:
response = requests.post(
self.webhook_url,
json=payload,
headers={'Content-Type': 'application/json'}
)
if response.status_code >= 200 and response.status_code < 300:
logger.info(f"Webhook notification sent successfully")
return True
else:
logger.warning(f"Webhook returned non-success status code: {response.status_code}")
return False
except Exception as e:
logger.error(f"Failed to send webhook notification: {str(e)}")
return False
def process_alert(self, alert):
"""Process an alert and send appropriate notifications."""
# Format alert message
subject = f"Environmental Alert: {alert['alert_type'].title()} {alert['measurement_type']} at {alert['location_name']}"
message = f"""
Environmental Monitoring System Alert
Location: {alert['building']} - {alert['location_name']}
Sensor Type: {alert['sensor_type']}
Alert Type: {alert['alert_type'].title()}
Measurement: {alert['measurement_type']}
Value: {alert['measurement_value']}
Threshold: {alert['threshold_value']}
Timestamp: {alert['timestamp']}
Message: {alert['message']}
This is an automated alert from the Environmental Monitoring System.
"""
# Send email notification
email_sent = self.send_email_notification(subject, message)
# Send webhook notification
webhook_payload = {
'alert_id': alert['id'],
'alert_type': alert['alert_type'],
'measurement_type': alert['measurement_type'],
'measurement_value': float(alert['measurement_value']),
'threshold_value': float(alert['threshold_value']),
'location': alert['location_name'],
'building': alert['building'],
'sensor_type': alert['sensor_type'],
'timestamp': alert['timestamp'],
'message': alert['message']
}
webhook_sent = self.send_webhook_notification(webhook_payload)
return {
'email_sent': email_sent,
'webhook_sent': webhook_sent
}
# Initialize database
db = EnvironmentDatabase()
analyzer = DataAnalyzer(db)
notification_manager = NotificationManager(
webhook_url='http://example.com/webhook' # Replace with actual webhook URL
)
# Flask API
server = Flask(__name__)
@server.route('/api/sensors', methods=['GET'])
def get_sensors():
sensors = db.get_sensor_info()
return jsonify(sensors.to_dict(orient='records'))
@server.route('/api/readings/latest', methods=['GET'])
def get_latest_readings():
readings = db.get_latest_readings()
return jsonify(readings.to_dict(orient='records'))
@server.route('/api/readings', methods=['GET'])
def get_readings():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=1)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())
sensor_ids = request.args.getlist('sensor_id')
sensor_ids = [int(s) for s in sensor_ids] if sensor_ids else None
measurement_types = request.args.getlist('measurement_type')
readings = db.get_readings_for_period(start_time, end_time, sensor_ids, measurement_types)
return jsonify(readings.to_dict(orient='records'))
@server.route('/api/sensors/<int:sensor_id>/readings', methods=['POST'])
def add_reading(sensor_id):
data = request.json
success = db.add_sensor_reading(sensor_id, data)
if success:
return jsonify({'status': 'success', 'message': 'Reading added successfully'})
else:
return jsonify({'status': 'error', 'message': 'Failed to add reading'}), 400
@server.route('/api/alerts', methods=['GET'])
def get_alerts():
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
acknowledged = request.args.get('acknowledged')
if acknowledged is not None:
acknowledged = acknowledged.lower() == 'true'
alerts = db.get_alerts(start_time, end_time, acknowledged)
return jsonify(alerts.to_dict(orient='records'))
@server.route('/api/alerts/<int:alert_id>/acknowledge', methods=['POST'])
def acknowledge_alert(alert_id):
success = db.acknowledge_alert(alert_id)
if success:
return jsonify({'status': 'success', 'message': 'Alert acknowledged'})
else:
return jsonify({'status': 'error', 'message': 'Alert not found'}), 404
@server.route('/api/analysis/anomalies', methods=['GET'])
def detect_anomalies():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=7)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())
sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
measurement_type = request.args.get('measurement_type', 'temperature')
readings = db.get_readings_for_period(start_time, end_time, [sensor_id] if sensor_id else None)
if len(readings) == 0:
return jsonify({'status': 'error', 'message': 'No data available for analysis'}), 400
columns = [col for col in readings.columns if col in [
'temperature', 'humidity', 'air_quality_index', 'co2_level',
'noise_level', 'light_level', 'pressure', 'particulate_matter', 'voc_level'
] and not readings[col].isna().all()]
if measurement_type in columns:
columns = [measurement_type]
if not columns:
return jsonify({'status': 'error', 'message': 'No valid measurement data available'}), 400
anomalies = analyzer.detect_anomalies(readings, columns)
return jsonify({
'status': 'success',
'anomalies': anomalies,
'total': len(anomalies)
})
@server.route('/api/analysis/patterns', methods=['GET'])
def identify_patterns():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=7)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())
sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
measurement_type = request.args.get('measurement_type', 'temperature')
period = request.args.get('period', 'daily')
readings = db.get_readings_for_period(start_time, end_time, [sensor_id] if sensor_id else None)
if len(readings) == 0:
return jsonify({'status': 'error', 'message': 'No data available for analysis'}), 400
if measurement_type not in readings.columns or readings[measurement_type].isna().all():
return jsonify({'status': 'error', 'message': f'No {measurement_type} data available'}), 400
pattern = analyzer.identify_patterns(readings, measurement_type, period)
if pattern is None:
return jsonify({'status': 'error', 'message': 'Failed to identify patterns'}), 400
return jsonify({
'status': 'success',
'period': period,
'measurement_type': measurement_type,
'pattern': pattern.to_dict(orient='records')
})
@server.route('/api/analysis/weather-correlation', methods=['GET'])
def correlate_with_weather():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=30)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())
sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
else:
return jsonify({'status': 'error', 'message': 'Sensor ID is required'}), 400
sensor_measurement = request.args.get('sensor_measurement', 'temperature')
weather_measurement = request.args.get('weather_measurement', 'temperature')
# Get sensor data
sensor_data = db.get_readings_for_period(start_time, end_time, [sensor_id])
if len(sensor_data) == 0 or sensor_measurement not in sensor_data.columns or sensor_data[sensor_measurement].isna().all():
return jsonify({'status': 'error', 'message': f'No {sensor_measurement} data available for sensor {sensor_id}'}), 400
# Get weather data
weather_data = db.get_external_weather(start_time, end_time)
if len(weather_data) == 0 or weather_measurement not in weather_data.columns or weather_data[weather_measurement].isna().all():
return jsonify({'status': 'error', 'message': f'No {weather_measurement} weather data available'}), 400
correlation = analyzer.correlate_with_weather(sensor_data, weather_data, sensor_measurement, weather_measurement)
if correlation is None:
return jsonify({'status': 'error', 'message': 'Failed to calculate correlation'}), 400
return jsonify({
'status': 'success',
'sensor_measurement': sensor_measurement,
'weather_measurement': weather_measurement,
'correlation': correlation
})
@server.route('/api/analysis/statistics', methods=['GET'])
def calculate_statistics():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=7)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())
sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
measurement_types = request.args.getlist('measurement_type')
if not measurement_types:
measurement_types = [
'temperature', 'humidity', 'air_quality_index', 'co2_level',
'noise_level', 'light_level', 'pressure', 'particulate_matter', 'voc_level'
]
groupby = request.args.getlist('groupby')
readings = db.get_readings_for_period(start_time, end_time, [sensor_id] if sensor_id else None)
if len(readings) == 0:
return jsonify({'status': 'error', 'message': 'No data available for analysis'}), 400
statistics = analyzer.calculate_statistics(readings, measurement_types, groupby if groupby else None)
return jsonify({
'status': 'success',
'statistics': statistics
})
@server.route('/api/analysis/forecast', methods=['GET'])
def forecast_trends():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=14)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())
sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
else:
return jsonify({'status': 'error', 'message': 'Sensor ID is required'}), 400
measurement_type = request.args.get('measurement_type', 'temperature')
periods = request.args.get('periods', 24, type=int)
readings = db.get_readings_for_period(start_time, end_time, [sensor_id])
if len(readings) == 0 or measurement_type not in readings.columns or readings[measurement_type].isna().all():
return jsonify({'status': 'error', 'message': f'No {measurement_type} data available for forecast'}), 400
forecast = analyzer.forecast_trends(readings, measurement_type, periods)
if forecast is None:
return jsonify({'status': 'error', 'message': 'Failed to generate forecast'}), 400
return jsonify({
'status': 'success',
'measurement_type': measurement_type,
'forecast': forecast.to_dict(orient='records')
})
# Dash App
dash_app = dash.Dash(__name__, server=server, url_base_pathname='/dashboard/')
dash_app.title = 'Environmental Monitoring Dashboard'
# Define the layout
dash_app.layout = html.Div([
html.Div([
html.H1("Environmental Monitoring Dashboard", style={'margin-bottom': '20px'}),
dcc.Tabs([
dcc.Tab(label="Overview", children=[
html.Div([
html.H2("Current Environmental Conditions"),
html.Div(id="last-updated"),
html.Div([
html.Div([
html.H3("Temperature"),
dcc.Graph(id='temp-gauge')
], className="four columns"),
html.Div([
html.H3("Humidity"),
dcc.Graph(id='humidity-gauge')
], className="four columns"),
html.Div([
html.H3("Air Quality"),
dcc.Graph(id='aqi-gauge')
], className="four columns"),
], className="row"),
html.Div([
html.Div([
html.H3("CO2 Levels"),
dcc.Graph(id='co2-gauge')
], className="four columns"),
html.Div([
html.H3("Noise Levels"),
dcc.Graph(id='noise-gauge')
], className="four columns"),
html.Div([
html.H3("Locations Map"),
dcc.Graph(id='locations-map')
], className="four columns"),
], className="row"),
html.H2("Alerts", style={'margin-top': '30px'}),
html.Div(id='alerts-table')
])
]),
dcc.Tab(label="Detailed Analysis", children=[
html.Div([
html.Div([
html.H3("Filters"),
html.Label("Select Location/Sensor:"),
dcc.Dropdown(id='sensor-dropdown'),
html.Label("Select Measurement:"),
dcc.Dropdown(
id='measurement-dropdown',
options=[
{'label': 'Temperature', 'value': 'temperature'},
{'label': 'Humidity', 'value': 'humidity'},
{'label': 'Air Quality Index', 'value': 'air_quality_index'},
{'label': 'CO2 Level', 'value': 'co2_level'},
{'label': 'Noise Level', 'value': 'noise_level'},
{'label': 'Light Level', 'value': 'light_level'},
{'label': 'Pressure', 'value': 'pressure'},
{'label': 'Particulate Matter', 'value': 'particulate_matter'},
{'label': 'VOC Level', 'value': 'voc_level'}
],
value='temperature'
),
html.Label("Time Range:"),
dcc.DatePickerRange(
id='date-picker',
start_date=(datetime.datetime.now() - datetime.timedelta(days=7)).date(),
end_date=datetime.datetime.now().date()
),
html.Button('Apply Filters', id='apply-filters', n_clicks=0)
], className="three columns"),
html.Div([
html.H3("Historical Data"),
dcc.Graph(id='time-series-graph'),
html.H3("Daily Patterns", style={'margin-top': '30px'}),
dcc.Graph(id='daily-pattern-graph'),
html.H3("Weather Correlation", style={'margin-top': '30px'}),
dcc.Graph(id='weather-correlation-graph')
], className="nine columns")
], className="row")
]),
dcc.Tab(label="Forecasting", children=[
html.Div([
html.H2("Environmental Forecasts"),
html.Div([
html.Div([
html.Label("Select Sensor:"),
dcc.Dropdown(id='forecast-sensor-dropdown'),
html.Label("Select Measurement:"),
dcc.Dropdown(
id='forecast-measurement-dropdown',
options=[
{'label': 'Temperature', 'value': 'temperature'},
{'label': 'Humidity', 'value': 'humidity'},
{'label': 'Air Quality Index', 'value': 'air_quality_index'},
{'label': 'CO2 Level', 'value': 'co2_level'},
{'label': 'Noise Level', 'value': 'noise_level'}
],
value='temperature'
),
html.Label("Forecast Period (hours):"),
dcc.Slider(
id='forecast-period-slider',
min=12,
max=72,
value=24,
marks={i: str(i) for i in range(12, 73, 12)},
step=12
),
html.Button('Generate Forecast', id='generate-forecast', n_clicks=0)
], className="three columns"),
html.Div([
dcc.Graph(id='forecast-graph')
], className="nine columns")
], className="row")
])
])
])
], style={'padding': '20px'})
])
# Callbacks
@dash_app.callback(
[Output('sensor-dropdown', 'options'),
Output('sensor-dropdown', 'value'),
Output('forecast-sensor-dropdown', 'options'),
Output('forecast-sensor-dropdown', 'value')],
[Input('apply-filters', 'n_clicks')]
)
def update_dropdowns(n_clicks):
sensors = db.get_sensor_info()
# Create options for dropdowns
options = []
for _, sensor in sensors.iterrows():
label = f"{sensor['location_name']} - {sensor['sensor_type']}"
value = sensor['id']
options.append({'label': label, 'value': value})
return options, options[0]['value'] if options else None, options, options[0]['value'] if options else None
@dash_app.callback(
[Output('temp-gauge', 'figure'),
Output('humidity-gauge', 'figure'),
Output('aqi-gauge', 'figure'),
Output('co2-gauge', 'figure'),
Output('noise-gauge', 'figure'),
Output('last-updated', 'children')],
[Input('apply-filters', 'n_clicks')]
)
def update_gauges(n_clicks):
latest_readings = db.get_latest_readings()
# Temperature gauge
temp_data = latest_readings[['sensor_id', 'temperature', 'location_name']].dropna(subset=['temperature'])
avg_temp = temp_data['temperature'].mean() if not temp_data.empty else None
temp_fig = go.Figure()
if avg_temp is not None:
temp_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_temp,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 40]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 18], 'color': "lightblue"},
{'range': [18, 22], 'color': "green"},
{'range': [22, 28], 'color': "yellow"},
{'range': [28, 40], 'color': "red"}
]
}
))
temp_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))
# Humidity gauge
humidity_data = latest_readings[['sensor_id', 'humidity', 'location_name']].dropna(subset=['humidity'])
avg_humidity = humidity_data['humidity'].mean() if not humidity_data.empty else None
humidity_fig = go.Figure()
if avg_humidity is not None:
humidity_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_humidity,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 30], 'color': "orange"},
{'range': [30, 60], 'color': "green"},
{'range': [60, 100], 'color': "lightblue"}
]
}
))
humidity_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))
# AQI gauge
aqi_data = latest_readings[['sensor_id', 'air_quality_index', 'location_name']].dropna(subset=['air_quality_index'])
avg_aqi = aqi_data['air_quality_index'].mean() if not aqi_data.empty else None
aqi_fig = go.Figure()
if avg_aqi is not None:
aqi_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_aqi,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 300]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 50], 'color': "green"},
{'range': [50, 100], 'color': "yellow"},
{'range': [100, 150], 'color': "orange"},
{'range': [150, 200], 'color': "red"},
{'range': [200, 300], 'color': "purple"}
]
}
))
aqi_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))
# CO2 gauge
co2_data = latest_readings[['sensor_id', 'co2_level', 'location_name']].dropna(subset=['co2_level'])
avg_co2 = co2_data['co2_level'].mean() if not co2_data.empty else None
co2_fig = go.Figure()
if avg_co2 is not None:
co2_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_co2,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [300, 2000]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [300, 600], 'color': "green"},
{'range': [600, 1000], 'color': "yellow"},
{'range': [1000, 1500], 'color': "orange"},
{'range': [1500, 2000], 'color': "red"}
]
}
))
co2_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))
# Noise gauge
noise_data = latest_readings[['sensor_id', 'noise_level', 'location_name']].dropna(subset=['noise_level'])
avg_noise = noise_data['noise_level'].mean() if not noise_data.empty else None
noise_fig = go.Figure()
if avg_noise is not None:
noise_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_noise,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [20, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [20, 40], 'color': "green"},
{'range': [40, 60], 'color': "yellow"},
{'range': [60, 80], 'color': "orange"},
{'range': [80, 100], 'color': "red"}
]
}
))
noise_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))
# Last updated
last_updated = latest_readings['timestamp'].max() if not latest_readings.empty else None
if last_updated:
last_updated_text = html.P(f"Last updated: {pd.to_datetime(last_updated).strftime('%Y-%m-%d %H:%M:%S')}")
else:
last_updated_text = html.P("No data available")
return temp_fig, humidity_fig, aqi_fig, co2_fig, noise_fig, last_updated_text
@dash_app.callback(
Output('locations-map', 'figure'),
[Input('apply-filters', 'n_clicks')]
)
def update_map(n_clicks):
# Get sensor locations
sensors = db.get_sensor_info()
# Create map
fig = go.Figure()
if not sensors.empty:
# Add indoor locations (buildings)
buildings = sensors[sensors['indoor'] == True].drop_duplicates(['building', 'location_name'])
for _, building in buildings.iterrows():
fig.add_trace(go.Scattergeo(
lon=[building['longitude']],
lat=[building['latitude']],
text=f"{building['building']} - {building['location_name']}",
mode='markers',
marker=dict(
size=10,
color='blue',
symbol='square'
),
name=building['building']
))
# Add outdoor locations
outdoors = sensors[sensors['indoor'] == False].drop_duplicates(['location_name'])
if not outdoors.empty:
fig.add_trace(go.Scattergeo(
lon=outdoors['longitude'],
lat=outdoors['latitude'],
text=outdoors['location_name'],
mode='markers',
marker=dict(
size=10,
color='green',
symbol='circle'
),
name='Outdoor Sensors'
))
# Center the map
center_lat = sensors['latitude'].mean()
center_lon = sensors['longitude'].mean()
fig.update_geos(
center=dict(lat=center_lat, lon=center_lon),
projection_scale=15, # Adjust zoom level
showcoastlines=True, coastlinecolor="RebeccaPurple",
showland=True, landcolor="LightGreen",
showocean=True, oceancolor="LightBlue",
showlakes=True, lakecolor="Blue"
)
fig.update_layout(
height=300,
margin=dict(l=20, r=20, t=30, b=20),
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
)
)
return fig
@dash_app.callback(
Output('alerts-table', 'children'),
[Input('apply-filters', 'n_clicks')]
)
def update_alerts_table(n_clicks):
# Get recent unacknowledged alerts
alerts = db.get_alerts(
start_time=(datetime.datetime.now() - datetime.timedelta(days=1)).isoformat(),
acknowledged=False
)
if alerts.empty:
return html.P("No active alerts")
# Sort by timestamp, most recent first
alerts = alerts.sort_values('timestamp', ascending=False)
# Create table
table = html.Table(
# Header
[html.Tr([
html.Th("Time"),
html.Th("Location"),
html.Th("Type"),
html.Th("Measurement"),
html.Th("Value"),
html.Th("Threshold"),
html.Th("Action")
])] +
# Body
[html.Tr([
html.Td(pd.to_datetime(row['timestamp']).strftime('%Y-%m-%d %H:%M')),
html.Td(f"{row['building']} - {row['location_name']}"),
html.Td(row['alert_type'].capitalize()),
html.Td(row['measurement_type'].replace('_', ' ').capitalize()),
html.Td(f"{row['measurement_value']:.1f}"),
html.Td(f"{row['threshold_value']:.1f}"),
html.Td(html.Button("Acknowledge", id={'type': 'ack-button', 'index': row['id']}))
]) for _, row in alerts.iterrows()]
)
return table
@dash_app.callback(
[Output('time-series-graph', 'figure'),
Output('daily-pattern-graph', 'figure'),
Output('weather-correlation-graph', 'figure')],
[Input('apply-filters', 'n_clicks')],
[State('sensor-dropdown', 'value'),
State('measurement-dropdown', 'value'),
State('date-picker', 'start_date'),
State('date-picker', 'end_date')]
)
def update_analysis_graphs(n_clicks, sensor_id, measurement, start_date, end_date):
if not sensor_id or not measurement:
# Return empty figures
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'No data selected',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig, empty_fig, empty_fig
# Convert dates to ISO format
start_time = f"{start_date}T00:00:00"
end_time = f"{end_date}T23:59:59"
# Get sensor data
readings = db.get_readings_for_period(start_time, end_time, [sensor_id])
if readings.empty or measurement not in readings.columns or readings[measurement].isna().all():
# Return empty figures
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'No data available',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig, empty_fig, empty_fig
# Get sensor location info
sensor_info = db.get_sensor_info()
location_info = sensor_info[sensor_info['id'] == sensor_id].iloc[0]
location_name = f"{location_info['building']} - {location_info['location_name']}"
# Time series graph
time_series_fig = go.Figure()
# Add the measurement data
time_series_fig.add_trace(go.Scatter(
x=pd.to_datetime(readings['timestamp']),
y=readings[measurement],
mode='lines',
name=measurement.replace('_', ' ').title()
))
# Get threshold values for the measurement
thresholds = db.get_thresholds(location_id=location_info['location_id'])
measurement_thresholds = thresholds[thresholds['measurement_type'] == measurement]
if not measurement_thresholds.empty:
min_val = measurement_thresholds['min_value'].iloc[0]
max_val = measurement_thresholds['max_value'].iloc[0]
if min_val is not None:
time_series_fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=min_val,
x1=readings['timestamp'].max(),
y1=min_val,
line=dict(color="red", width=2, dash="dash")
)
if max_val is not None:
time_series_fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=max_val,
x1=readings['timestamp'].max(),
y1=max_val,
line=dict(color="red", width=2, dash="dash")
)
time_series_fig.update_layout(
title=f"{measurement.replace('_', ' ').title()} Over Time - {location_name}",
xaxis_title="Time",
yaxis_title=measurement.replace('_', ' ').title(),
hovermode="x unified"
)
# Daily pattern graph
pattern = analyzer.identify_patterns(readings, measurement, 'daily')
if pattern is None:
pattern_fig = go.Figure()
pattern_fig.update_layout(
annotations=[{
'text': 'Insufficient data for pattern analysis',
'showarrow': False,
'font': {'size': 20}
}]
)
else:
pattern_fig = go.Figure()
# Add mean line
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['mean'],
mode='lines+markers',
name='Average',
line=dict(color='blue', width=2)
))
# Add range (mean ± std)
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['mean'] + pattern['std'],
mode='lines',
line=dict(width=0),
showlegend=False
))
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['mean'] - pattern['std'],
mode='lines',
line=dict(width=0),
fill='tonexty',
fillcolor='rgba(0, 0, 255, 0.2)',
name='Std Dev'
))
# Add min and max
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['min'],
mode='lines',
line=dict(color='green', width=1, dash='dot'),
name='Min'
))
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['max'],
mode='lines',
line=dict(color='red', width=1, dash='dot'),
name='Max'
))
pattern_fig.update_layout(
title=f"Daily Pattern - {measurement.replace('_', ' ').title()} - {location_name}",
xaxis_title="Hour of Day",
yaxis_title=measurement.replace('_', ' ').title(),
xaxis=dict(tickmode='linear', tick0=0, dtick=2),
hovermode="x unified"
)
# Weather correlation graph
weather_measurement = 'temperature' if measurement != 'temperature' else 'humidity'
# Get weather data
weather_data = db.get_external_weather(start_time, end_time)
if weather_data.empty or weather_measurement not in weather_data.columns or weather_data[weather_measurement].isna().all():
correlation_fig = go.Figure()
correlation_fig.update_layout(
annotations=[{
'text': 'No weather data available for correlation',
'showarrow': False,
'font': {'size': 20}
}]
)
else:
correlation = analyzer.correlate_with_weather(readings, weather_data, measurement, weather_measurement)
if correlation is None:
correlation_fig = go.Figure()
correlation_fig.update_layout(
annotations=[{
'text': 'Insufficient data for correlation analysis',
'showarrow': False,
'font': {'size': 20}
}]
)
else:
# Create a merged dataset for the scatter plot
sensor_data = readings[['timestamp', measurement]].copy()
sensor_data['timestamp'] = pd.to_datetime(sensor_data['timestamp'])
sensor_data['timestamp_key'] = sensor_data['timestamp'].dt.floor('H')
weather_data = weather_data[['timestamp', weather_measurement]].copy()
weather_data['timestamp'] = pd.to_datetime(weather_data['timestamp'])
weather_data['timestamp_key'] = weather_data['timestamp'].dt.floor('H')
merged = pd.merge_asof(
sensor_data.sort_values('timestamp'),
weather_data.sort_values('timestamp'),
left_on='timestamp',
right_on='timestamp',
direction='nearest',
tolerance=pd.Timedelta('1H')
)
# Drop rows with missing values
merged = merged.dropna(subset=[measurement, weather_measurement])
correlation_fig = go.Figure(data=go.Scatter(
x=merged[weather_measurement],
y=merged[measurement],
mode='markers',
marker=dict(
size=8,
color='blue',
opacity=0.6
)
))
# Add a best fit line
if len(merged) > 2:
# Linear regression
slope, intercept = np.polyfit(merged[weather_measurement], merged[measurement], 1)
x_range = np.linspace(merged[weather_measurement].min(), merged[weather_measurement].max(), 100)
y_range = slope * x_range + intercept
correlation_fig.add_trace(go.Scatter(
x=x_range,
y=y_range,
mode='lines',
line=dict(color='red', width=2),
name=f'Fit: y = {slope:.2f}x + {intercept:.2f}'
))
correlation_fig.update_layout(
title=f"Correlation with Weather - {correlation['correlation']:.2f} (p={correlation['p_value']:.4f})",
xaxis_title=weather_measurement.replace('_', ' ').title(),
yaxis_title=measurement.replace('_', ' ').title(),
hovermode="closest"
)
return time_series_fig, pattern_fig, correlation_fig
@dash_app.callback(
Output('forecast-graph', 'figure'),
[Input('generate-forecast', 'n_clicks')],
[State('forecast-sensor-dropdown', 'value'),
State('forecast-measurement-dropdown', 'value'),
State('forecast-period-slider', 'value')]
)
def update_forecast(n_clicks, sensor_id, measurement, periods):
if not n_clicks or not sensor_id or not measurement:
# Return empty figure
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'Click "Generate Forecast" to see predictions',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig
# Get historical data for the past 14 days
end_time = datetime.datetime.now().isoformat()
start_time = (datetime.datetime.now() - datetime.timedelta(days=14)).isoformat()
# Get sensor data
readings = db.get_readings_for_period(start_time, end_time, [sensor_id])
if readings.empty or measurement not in readings.columns or readings[measurement].isna().all():
# Return empty figure
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'No data available for forecast',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig
# Get sensor location info
sensor_info = db.get_sensor_info()
location_info = sensor_info[sensor_info['id'] == sensor_id].iloc[0]
location_name = f"{location_info['building']} - {location_info['location_name']}"
# Generate forecast
forecast = analyzer.forecast_trends(readings, measurement, periods)
if forecast is None:
# Return empty figure
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'Insufficient data for forecasting',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig
# Create figure with both historical and forecasted data
fig = go.Figure()
# Add historical data
fig.add_trace(go.Scatter(
x=pd.to_datetime(readings['timestamp']),
y=readings[measurement],
mode='lines',
name='Historical Data',
line=dict(color='blue')
))
# Add forecasted data
forecast_col = f'forecast_{measurement}'
fig.add_trace(go.Scatter(
x=pd.to_datetime(forecast['timestamp']),
y=forecast[forecast_col],
mode='lines',
name='Forecast',
line=dict(color='red', dash='dash')
))
# Get threshold values for the measurement
thresholds = db.get_thresholds(location_id=location_info['location_id'])
measurement_thresholds = thresholds[thresholds['measurement_type'] == measurement]
if not measurement_thresholds.empty:
min_val = measurement_thresholds['min_value'].iloc[0]
max_val = measurement_thresholds['max_value'].iloc[0]
if min_val is not None:
fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=min_val,
x1=forecast['timestamp'].max(),
y1=min_val,
line=dict(color="orange", width=2, dash="dot")
)
if max_val is not None:
fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=max_val,
x1=forecast['timestamp'].max(),
y1=max_val,
line=dict(color="orange", width=2, dash="dot")
)
fig.update_layout(
title=f"{measurement.replace('_', ' ').title()} Forecast - {location_name}",
xaxis_title="Time",
yaxis_title=measurement.replace('_', ' ').title(),
hovermode="x unified"
)
# Add vertical line separating historical and forecast data
current_time = datetime.datetime.now()
fig.add_shape(
type="line",
x0=current_time,
y0=readings[measurement].min(),
x1=current_time,
y1=readings[measurement].max(),
line=dict(color="green", width=2, dash="dash")
)
fig.add_annotation(
x=current_time,
y=readings[measurement].max(),
text="Now",
showarrow=True,
arrowhead=1
)
return fig
# Alert monitoring thread
def monitor_alerts():
"""Background thread to monitor and process alerts."""
while True:
try:
# Get unprocessed alerts (last hour, unacknowledged)
alerts = db.get_alerts(
start_time=(datetime.datetime.now() - datetime.timedelta(hours=1)).isoformat(),
acknowledged=False
)
if not alerts.empty:
for _, alert in alerts.iterrows():
# Process the alert
notification_manager.process_alert(alert)
# Mark as acknowledged so we don't process it again
db.acknowledge_alert(alert['id'])
# Sleep for a while
time.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in alert monitoring thread: {str(e)}")
time.sleep(300) # Sleep longer on error
# Start the alert monitoring thread
alert_thread = threading.Thread(target=monitor_alerts, daemon=True)
alert_thread.start()
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
Resources:
- Raspberry Pi Documentation
- Arduino with Python
- InfluxDB for Time Series Data
- Grafana for Monitoring Dashboards
Tips for Project Success {#tips}
To make the most of your Python project experience:
- Start with planning: Create a detailed project plan before writing any code
- Use version control: Set up Git from the beginning to track changes
- Document thoroughly: Write clear documentation as you go
- Test consistently: Implement unit tests to ensure functionality
- Seek feedback: Share your work with peers and instructors
- Address real problems: Focus on solving genuine issues to stay motivated
- Present professionally: Create demos and presentations of your finished project
FAQs About Python Projects {#faqs}
How long should a college Python project take to complete?
The timeframe varies depending on project complexity and your experience level. Simple projects might take 2-4 weeks, while more complex ones can span an entire semester. Set realistic milestones and track your progress.
Do I need advanced Python knowledge to complete these projects?
While some projects require intermediate to advanced skills, you can modify any project to match your current skill level. Start with components you understand and gradually incorporate more complex elements as you learn.
How do I showcase these projects to potential employers?
Create a GitHub portfolio with well-documented repositories, include demonstration videos, develop case studies explaining your process, and consider hosting live demos for web-based projects. Make sure to highlight the problems solved and skills demonstrated.
Can I work on these projects in teams?
Absolutely! Many of these projects benefit from collaborative work. Divide responsibilities based on team members’ strengths and use project management tools to coordinate efforts.
What if I encounter problems I can’t solve?
This is normal and part of the learning process. Utilize online communities like Stack Overflow, GitHub Discussions, or Reddit’s r/learnpython. Don’t hesitate to consult with professors or teaching assistants for guidance.
How can I extend these projects for my thesis or capstone?
To extend these projects for advanced academic work, consider adding research components, conducting user studies, implementing more sophisticated algorithms, or combining multiple projects into larger systems with broader implications.
Conclusion
These top 10 Python projects offer excellent opportunities for college students to develop practical skills while building impressive portfolio pieces. By tackling real-world problems with Python’s powerful capabilities, you’ll position yourself for success in today’s competitive tech landscape.
Remember that the journey of completing these projects is as valuable as the final result. Each challenge you overcome strengthens your problem-solving abilities and deepens your understanding of programming concepts.
We’d love to hear about your experiences with these projects! Share your progress in the comments below or reach out with questions if you need guidance getting started.
[Interested in more programming resources? Check out our complete Python learning path for beginners]
About the Author
Harshit Arya – I am a robotics & machine learning enthusiast, who always seeks to develop innovative projects and to solve real world problems. I have also worked on many Python, web & app based projects. Apart from this I am a quick learner and problem solver.
Tags: Python projects, college students, programming projects, computer science projects, Python programming, student portfolio, coding projects