Last updated: March 16, 2025

A modern workspace with a laptop displaying Python code on the screen, surrounded by project-related icons like AI, data science, web development, and automation. A college student’s desk with books, coffee, and a notebook. The background has a tech-inspired theme with subtle coding elements.

Are you a college student looking to enhance your programming portfolio with impressive Python projects? Look no further! Python continues to dominate as one of the most versatile and in-demand programming languages in 2025. Whether you’re a beginner or have intermediate skills, these innovative Python projects will boost your resume, strengthen your coding abilities, and give you practical experience that employers are actively seeking.

Quick Navigation:

Why Python Projects Are Essential for College Students {#why-python}

Python’s simplicity and versatility make it the perfect language for college projects. Here’s why building Python projects during your college years is crucial:

  • Industry Relevance: Python skills are highly sought after in fields ranging from data science to web development
  • Practical Application: Projects transform theoretical knowledge into practical skills
  • Portfolio Building: Having completed projects demonstrates your capabilities to potential employers
  • Problem-Solving Skills: Project development enhances your ability to solve real-world problems
  • Career Advancement: Python expertise opens doors to internships and entry-level positions

How to Choose the Right Project {#how-to-choose}

Before diving into our recommended projects, consider these factors when selecting a Python project:

  1. Align with your interests: Choose projects in domains you’re passionate about
  2. Match your skill level: Start with simpler projects if you’re a beginner
  3. Consider time constraints: Be realistic about what you can accomplish
  4. Think about uniqueness: Add your own creative twist to stand out
  5. Focus on marketable skills: Select projects that teach in-demand technologies

Top 10 Python Projects for College Students {#top-10-projects}

1. AI-Powered Personal Study Assistant

A futuristic study desk with an AI-powered personal assistant displayed as a hologram above a laptop. The assistant has a sleek, glowing interface showing notes, reminders, and study tips. The desk is equipped with books, a tablet, and a coffee mug. The background has a tech-inspired theme with subtle digital elements.

Difficulty Level: Intermediate to Advanced

Skills Developed: Natural Language Processing, Machine Learning, API Integration

Project Description: Build an AI-powered study assistant that helps college students manage their learning process. This application uses natural language processing to understand questions, provide explanations, create flashcards from notes, and recommend additional resources based on the student’s learning patterns.

Key Features:

  • Question-answering system using transformers
  • Automatic summarization of lecture notes
  • Personalized study plan generation
  • Integration with calendar for study reminders
  • Voice command capability

Implementation Steps:

  1. Set up a Python environment with necessary libraries (transformers, TensorFlow/PyTorch)
  2. Implement the NLP models for understanding user queries
  3. Create a knowledge base system for storing and retrieving information
  4. Develop a simple GUI using Tkinter or a web interface with Flask
  5. Implement a recommendation system for study resources
  6. Add voice recognition using libraries like SpeechRecognition

Sample Code Implementation:

# study_assistant.py
import os
import speech_recognition as sr
import datetime
import json
from tkinter import *
from tkinter import messagebox, ttk
from transformers import pipeline, BartTokenizer, BartForConditionalGeneration
import pyttsx3
import webbrowser

class StudyAssistant:
    def __init__(self, root):
        self.root = root
        self.root.title("AI Study Assistant")
        self.root.geometry("800x600")
        
        # Initialize NLP components
        self.qa_model = pipeline("question-answering")
        self.summarizer_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
        self.summarizer_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn")
        
        # Initialize speech components
        self.engine = pyttsx3.init()
        self.recognizer = sr.Recognizer()
        
        # Initialize knowledge base
        self.knowledge_base = self.load_knowledge_base()
        
        # Create UI
        self.create_widgets()
    
    def load_knowledge_base(self):
        try:
            with open("knowledge_base.json", "r") as f:
                return json.load(f)
        except FileNotFoundError:
            # Create default knowledge base if file doesn't exist
            default_kb = {
                "subjects": [],
                "notes": {},
                "flashcards": {},
                "resources": {}
            }
            with open("knowledge_base.json", "w") as f:
                json.dump(default_kb, f)
            return default_kb
    
    def save_knowledge_base(self):
        with open("knowledge_base.json", "w") as f:
            json.dump(self.knowledge_base, f)
    
    def create_widgets(self):
        # Create tabs
        self.tab_control = ttk.Notebook(self.root)
        
        self.tab_qa = Frame(self.tab_control)
        self.tab_notes = Frame(self.tab_control)
        self.tab_flashcards = Frame(self.tab_control)
        self.tab_plan = Frame(self.tab_control)
        
        self.tab_control.add(self.tab_qa, text="Ask Questions")
        self.tab_control.add(self.tab_notes, text="Notes Manager")
        self.tab_control.add(self.tab_flashcards, text="Flashcards")
        self.tab_control.add(self.tab_plan, text="Study Plan")
        
        self.tab_control.pack(expand=1, fill="both")
        
        # Question-answering tab
        Label(self.tab_qa, text="Ask any study question:", font=("Arial", 14)).pack(pady=10)
        
        self.context_frame = Frame(self.tab_qa)
        self.context_frame.pack(pady=10, fill=X, padx=20)
        
        Label(self.context_frame, text="Context (optional):", font=("Arial", 12)).pack(anchor=W)
        self.context_text = Text(self.context_frame, height=8)
        self.context_text.pack(fill=X)
        
        self.question_frame = Frame(self.tab_qa)
        self.question_frame.pack(pady=10, fill=X, padx=20)
        
        Label(self.question_frame, text="Your question:", font=("Arial", 12)).pack(anchor=W)
        self.question_entry = Entry(self.question_frame, font=("Arial", 12))
        self.question_entry.pack(fill=X)
        
        Button(self.tab_qa, text="Ask", command=self.answer_question, bg="#4CAF50", fg="white", font=("Arial", 12)).pack(pady=10)
        Button(self.tab_qa, text="Voice Input", command=self.voice_input, bg="#2196F3", fg="white", font=("Arial", 12)).pack(pady=5)
        
        Label(self.tab_qa, text="Answer:", font=("Arial", 12)).pack(anchor=W, padx=20, pady=5)
        self.answer_text = Text(self.tab_qa, height=10, wrap=WORD)
        self.answer_text.pack(fill=BOTH, expand=True, padx=20, pady=5)
        
        # Implement other tabs similarly (notes, flashcards, plan)
        # ...
    
    def answer_question(self):
        question = self.question_entry.get()
        context = self.context_text.get("1.0", END)
        
        if not question:
            messagebox.showwarning("Input Required", "Please enter a question.")
            return
        
        if not context.strip():
            # Search knowledge base if no context provided
            for subject, notes in self.knowledge_base["notes"].items():
                result = self.qa_model(question=question, context=notes)
                if result["score"] > 0.7:  # Confidence threshold
                    self.answer_text.delete("1.0", END)
                    self.answer_text.insert(END, result["answer"])
                    return
            
            self.answer_text.delete("1.0", END)
            self.answer_text.insert(END, "I don't have enough information to answer that question. Please provide some context or add relevant notes to your knowledge base.")
        else:
            # Use provided context
            result = self.qa_model(question=question, context=context)
            self.answer_text.delete("1.0", END)
            self.answer_text.insert(END, result["answer"])
    
    def voice_input(self):
        try:
            with sr.Microphone() as source:
                self.answer_text.delete("1.0", END)
                self.answer_text.insert(END, "Listening... Speak now.")
                self.root.update()
                
                audio = self.recognizer.listen(source)
                text = self.recognizer.recognize_google(audio)
                
                self.question_entry.delete(0, END)
                self.question_entry.insert(0, text)
                
                self.answer_text.delete("1.0", END)
                self.answer_text.insert(END, f"Recognized: {text}")
        except Exception as e:
            self.answer_text.delete("1.0", END)
            self.answer_text.insert(END, f"Error: {str(e)}")
    
    def summarize_text(self, text):
        inputs = self.summarizer_tokenizer(text, max_length=1024, return_tensors="pt", truncation=True)
        summary_ids = self.summarizer_model.generate(inputs["input_ids"], num_beams=4, min_length=30, max_length=100, early_stopping=True)
        summary = self.summarizer_tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        return summary
    
    # Additional methods for other features would go here
    # ...

if __name__ == "__main__":
    root = Tk()
    app = StudyAssistant(root)
    root.mainloop()

Resources:

2. Blockchain-Based Academic Credential Verification System

A futuristic digital interface displaying a Blockchain-Based Academic Credential Verification System. A holographic screen shows a verified diploma with blockchain security icons like a padlock and interconnected nodes. The background has a high-tech theme with digital ledgers, circuits, and glowing authentication symbols. A student holds a digital certificate on a tablet.

Difficulty Level: Advanced

Skills Developed: Blockchain, Cryptography, Web Development, Database Management

Project Description: Create a secure system for verifying academic credentials using blockchain technology. This project allows educational institutions to issue digital certificates that can be instantly verified by employers, eliminating certificate fraud and streamlining the verification process.

Key Features:

  • Blockchain implementation for storing credential hashes
  • Digital signature verification
  • QR code generation for easy credential sharing
  • Admin portal for educational institutions
  • Verification portal for employers

Implementation Steps:

  1. Set up a Python environment with blockchain libraries (e.g., web3.py)
  2. Design the database schema for storing credential metadata
  3. Implement the blockchain component for secure verification
  4. Create the web interface using Flask or Django
  5. Develop the digital signature system
  6. Implement the QR code generation system

Sample Code Implementation:

# app.py - Main Flask application
from flask import Flask, render_template, request, jsonify, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from web3 import Web3
import json
import qrcode
import hashlib
import os
from io import BytesIO
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import datetime

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///credentials.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

# Connect to local Ethereum node (would connect to actual network in production)
w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))

# Simple blockchain implementation (in production, would use actual Ethereum network)
class SimpleBlockchain:
    def __init__(self):
        self.chain = []
        self.create_genesis_block()
    
    def create_genesis_block(self):
        # First block in the chain
        self.chain.append({
            'index': 1,
            'timestamp': str(datetime.datetime.now()),
            'data': "Genesis Block",
            'prev_hash': "0",
            'hash': self.hash_block("Genesis Block")
        })
    
    def hash_block(self, data):
        # Create SHA-256 hash of block
        return hashlib.sha256(str(data).encode()).hexdigest()
    
    def add_credential(self, credential_data):
        # Add new credential to blockchain
        prev_block = self.chain[-1]
        new_block = {
            'index': len(self.chain) + 1,
            'timestamp': str(datetime.datetime.now()),
            'data': credential_data,
            'prev_hash': prev_block['hash'],
            'hash': self.hash_block(credential_data + prev_block['hash'])
        }
        self.chain.append(new_block)
        return new_block['hash']
    
    def verify_credential(self, credential_hash):
        # Check if credential hash exists in blockchain
        for block in self.chain:
            if block['hash'] == credential_hash:
                return True
        return False

# Initialize blockchain
blockchain = SimpleBlockchain()

# Database models
class Institution(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    email = db.Column(db.String(100), unique=True, nullable=False)
    password_hash = db.Column(db.String(200), nullable=False)
    public_key = db.Column(db.Text, nullable=False)
    credentials = db.relationship('Credential', backref='issuer', lazy=True)

class Credential(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    student_name = db.Column(db.String(100), nullable=False)
    student_id = db.Column(db.String(50), nullable=False)
    credential_type = db.Column(db.String(50), nullable=False)
    issue_date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
    blockchain_hash = db.Column(db.String(200), nullable=False)
    digital_signature = db.Column(db.Text, nullable=False)
    institution_id = db.Column(db.Integer, db.ForeignKey('institution.id'), nullable=False)

# Routes
@app.route('/')
def index():
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register_institution():
    if request.method == 'POST':
        name = request.form.get('name')
        email = request.form.get('email')
        password = request.form.get('password')
        
        # Check if institution already exists
        if Institution.query.filter_by(email=email).first():
            return "Institution already registered"
        
        # Generate RSA key pair
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
        )
        
        public_key = private_key.public_key()
        
        # Serialize public key for storage
        public_key_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ).decode('utf-8')
        
        # Serialize private key for the institution to download
        private_key_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        ).decode('utf-8')
        
        # Create new institution
        new_institution = Institution(
            name=name,
            email=email,
            password_hash=generate_password_hash(password),
            public_key=public_key_pem
        )
        
        db.session.add(new_institution)
        db.session.commit()
        
        # In a real application, you would securely provide the private key to the institution
        return render_template('private_key.html', private_key=private_key_pem)
    
    return render_template('register.html')

@app.route('/issue', methods=['GET', 'POST'])
def issue_credential():
    # In a real app, you would verify the institution is logged in
    if request.method == 'POST':
        student_name = request.form.get('student_name')
        student_id = request.form.get('student_id')
        credential_type = request.form.get('credential_type')
        institution_id = request.form.get('institution_id')
        signature = request.form.get('signature')  # Digital signature would be created client-side
        
        # Create credential data
        credential_data = f"{student_name}:{student_id}:{credential_type}:{institution_id}:{datetime.datetime.now()}"
        
        # Add to blockchain
        blockchain_hash = blockchain.add_credential(credential_data)
        
        # Create new credential record
        new_credential = Credential(
            student_name=student_name,
            student_id=student_id,
            credential_type=credential_type,
            blockchain_hash=blockchain_hash,
            digital_signature=signature,
            institution_id=institution_id
        )
        
        db.session.add(new_credential)
        db.session.commit()
        
        # Generate QR code
        qr = qrcode.QRCode(
            version=1,
            error_correction=qrcode.constants.ERROR_CORRECT_L,
            box_size=10,
            border=4,
        )
        qr.add_data(f"https://example.com/verify/{blockchain_hash}")
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        buffered = BytesIO()
        img.save(buffered)
        img_str = base64.b64encode(buffered.getvalue()).decode()
        
        return render_template('credential_issued.html', 
                               credential=new_credential, 
                               qr_code=img_str)
    
    institutions = Institution.query.all()
    return render_template('issue.html', institutions=institutions)

@app.route('/verify/<hash>', methods=['GET'])
def verify_credential(hash):
    # Check if credential exists in blockchain
    is_valid = blockchain.verify_credential(hash)
    
    if is_valid:
        # Get credential details from database
        credential = Credential.query.filter_by(blockchain_hash=hash).first()
        if credential:
            return render_template('verify.html', 
                                  credential=credential, 
                                  institution=credential.issuer,
                                  is_valid=True)
    
    return render_template('verify.html', is_valid=False)

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True)

Resources:

3. Smart Personal Finance Manager with Predictive Analytics

A futuristic personal finance dashboard displayed on a high-tech screen, featuring predictive analytics charts, expense tracking, and AI-driven financial recommendations. A person interacts with a holographic interface, analyzing budget forecasts and smart savings plans. The background has a digital financial theme with glowing graphs, currency symbols, and data flow visuals.

Difficulty Level: Intermediate

Skills Developed: Data Analysis, Machine Learning, Data Visualization, API Integration

Project Description: Develop a comprehensive personal finance manager that helps college students track expenses, set budgets, and make financial decisions. What sets this project apart is its predictive analytics component that forecasts future expenses based on spending patterns and suggests ways to optimize finances.

Key Features:

  • Expense tracking and categorization
  • Budget setting and monitoring
  • Bank account integration via secure APIs
  • Expense prediction using machine learning
  • Interactive data visualizations
  • Customized financial advice

Implementation Steps:

  1. Set up a Python environment with data analysis libraries (pandas, NumPy)
  2. Implement the database for storing financial data
  3. Create models for expense categorization
  4. Develop prediction algorithms using scikit-learn
  5. Build visualization components with Matplotlib or Plotly
  6. Create a user-friendly interface with Streamlit or Flask

Sample Code Implementation:

# finance_manager.py
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import LabelEncoder
from datetime import datetime, timedelta
import sqlite3
import uuid
import hashlib
import json
import os

class FinanceManager:
    def __init__(self):
        self.conn = sqlite3.connect('finance.db', check_same_thread=False)
        self.create_tables()
        
    def create_tables(self):
        cursor = self.conn.cursor()
        
        # Create users table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id TEXT PRIMARY KEY,
            username TEXT UNIQUE,
            password TEXT,
            created_at TIMESTAMP
        )
        ''')
        
        # Create transactions table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS transactions (
            id TEXT PRIMARY KEY,
            user_id TEXT,
            date TIMESTAMP,
            amount REAL,
            category TEXT,
            description TEXT,
            is_income BOOLEAN,
            FOREIGN KEY (user_id) REFERENCES users (id)
        )
        ''')
        
        # Create budgets table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS budgets (
            id TEXT PRIMARY KEY,
            user_id TEXT,
            category TEXT,
            amount REAL,
            period TEXT,
            FOREIGN KEY (user_id) REFERENCES users (id)
        )
        ''')
        
        self.conn.commit()
    
    def register_user(self, username, password):
        cursor = self.conn.cursor()
        user_id = str(uuid.uuid4())
        hashed_password = hashlib.sha256(password.encode()).hexdigest()
        
        try:
            cursor.execute(
                "INSERT INTO users (id, username, password, created_at) VALUES (?, ?, ?, ?)",
                (user_id, username, hashed_password, datetime.now())
            )
            self.conn.commit()
            return True
        except sqlite3.IntegrityError:
            return False
    
    def authenticate_user(self, username, password):
        cursor = self.conn.cursor()
        hashed_password = hashlib.sha256(password.encode()).hexdigest()
        
        cursor.execute(
            "SELECT id FROM users WHERE username = ? AND password = ?",
            (username, hashed_password)
        )
        
        result = cursor.fetchone()
        return result[0] if result else None
    
    def add_transaction(self, user_id, date, amount, category, description, is_income):
        cursor = self.conn.cursor()
        transaction_id = str(uuid.uuid4())
        
        cursor.execute(
            "INSERT INTO transactions (id, user_id, date, amount, category, description, is_income) VALUES (?, ?, ?, ?, ?, ?, ?)",
            (transaction_id, user_id, date, amount, category, description, is_income)
        )
        
        self.conn.commit()
        return transaction_id
    
    def get_transactions(self, user_id, start_date=None, end_date=None):
        cursor = self.conn.cursor()
        query = "SELECT * FROM transactions WHERE user_id = ?"
        params = [user_id]
        
        if start_date:
            query += " AND date >= ?"
            params.append(start_date)
        
        if end_date:
            query += " AND date <= ?"
            params.append(end_date)
        
        cursor.execute(query, params)
        columns = [col[0] for col in cursor.description]
        transactions = [dict(zip(columns, row)) for row in cursor.fetchall()]
        
        return transactions
    
    def set_budget(self, user_id, category, amount, period):
        cursor = self.conn.cursor()
        budget_id = str(uuid.uuid4())
        
        # Check if budget already exists
        cursor.execute(
            "SELECT id FROM budgets WHERE user_id = ? AND category = ? AND period = ?",
            (user_id, category, period)
        )
        
        existing_budget = cursor.fetchone()
        
        if existing_budget:
            # Update existing budget
            cursor.execute(
                "UPDATE budgets SET amount = ? WHERE id = ?",
                (amount, existing_budget[0])
            )
        else:
            # Create new budget
            cursor.execute(
                "INSERT INTO budgets (id, user_id, category, amount, period) VALUES (?, ?, ?, ?, ?)",
                (budget_id, user_id, category, amount, period)
            )
        
        self.conn.commit()
    
    def get_budgets(self, user_id, period=None):
        cursor = self.conn.cursor()
        query = "SELECT * FROM budgets WHERE user_id = ?"
        params = [user_id]
        
        if period:
            query += " AND period = ?"
            params.append(period)
        
        cursor.execute(query, params)
        columns = [col[0] for col in cursor.description]
        budgets = [dict(zip(columns, row)) for row in cursor.fetchall()]
        
        return budgets
    
    def predict_expenses(self, user_id, prediction_months=3):
        # Get past transactions
        transactions = self.get_transactions(user_id)
        
        if not transactions:
            return None
        
        # Convert to DataFrame
        df = pd.DataFrame(transactions)
        df['date'] = pd.to_datetime(df['date'])
        
        # Filter for expenses only
        expenses_df = df[df['is_income'] == 0].copy()
        
        if expenses_df.empty:
            return None
        
        # Extract features
        expenses_df['month'] = expenses_df['date'].dt.month
        expenses_df['year'] = expenses_df['date'].dt.year
        expenses_df['day_of_month'] = expenses_df['date'].dt.day
        expenses_df['day_of_week'] = expenses_df['date'].dt.dayofweek
        
        # Encode categorical features
        le = LabelEncoder()
        expenses_df['category_encoded'] = le.fit_transform(expenses_df['category'])
        
        # Group by month and category to get monthly spending per category
        monthly_by_category = expenses_df.groupby(['year', 'month', 'category']).agg({
            'amount': 'sum',
            'category_encoded': 'first'
        }).reset_index()
        
        # Prepare training data
        X = monthly_by_category[['year', 'month', 'category_encoded']]
        y = monthly_by_category['amount']
        
        # Train model
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X, y)
        
        # Generate future dates for prediction
        last_date = expenses_df['date'].max()
        future_dates = []
        categories = expenses_df['category'].unique()
        
        for i in range(1, prediction_months + 1):
            future_date = last_date + pd.DateOffset(months=i)
            for category in categories:
                category_encoded = le.transform([category])[0]
                future_dates.append({
                    'year': future_date.year,
                    'month': future_date.month,
                    'category': category,
                    'category_encoded': category_encoded
                })
        
        future_df = pd.DataFrame(future_dates)
        
        # Make predictions
        future_df['predicted_amount'] = model.predict(future_df[['year', 'month', 'category_encoded']])
        
        # Format results
        predictions = future_df.groupby(['year', 'month']).agg({
            'predicted_amount': 'sum'
        }).reset_index()
        
        predictions_by_category = future_df.copy()
        
        return {
            'total_by_month': predictions.to_dict('records'),
            'by_category': predictions_by_category.to_dict('records')
        }
    
    def visualize_spending(self, user_id, period="last6months"):
        transactions = self.get_transactions(user_id)
        
        if not transactions:
            return None
        
        df = pd.DataFrame(transactions)
        df['date'] = pd.to_datetime(df['date'])
        
        # Filter by period
        now = datetime.now()
        if period == "last6months":
            start_date = now - timedelta(days=180)
        elif period == "last12months":
            start_date = now - timedelta(days=365)
        elif period == "thisyear":
            start_date = datetime(now.year, 1, 1)
        else:
            start_date = df['date'].min()
        
        df = df[df['date'] >= start_date]
        
        # Separate income and expenses
        income_df = df[df['is_income'] == 1]
        expenses_df = df[df['is_income'] == 0]
        
        # Monthly totals
        df['month_year'] = df['date'].dt.strftime('%Y-%m')
        monthly_totals = df.groupby(['month_year', 'is_income']).agg({
            'amount': 'sum'
        }).reset_index()
        
        # Category breakdown
        category_totals = expenses_df.groupby('category').agg({
            'amount': 'sum'
        }).reset_index().sort_values('amount', ascending=False)
        
        # Time series data
        time_series = df.groupby(['date', 'is_income']).agg({
            'amount': 'sum'
        }).reset_index()
        
        return {
            'monthly_totals': monthly_totals.to_dict('records'),
            'category_totals': category_totals.to_dict('records'),
            'time_series': time_series.to_dict('records')
        }

# Streamlit app implementation
def main():
    st.set_page_config(page_title="Smart Finance Manager", layout="wide")
    
    finance_manager = FinanceManager()
    
    # Authentication
    if 'user_id' not in st.session_state:
        st.session_state.user_id = None
    
    if st.session_state.user_id is None:
        st.title("Smart Personal Finance Manager")
        tab1, tab2 = st.tabs(["Login", "Register"])
        
        with tab1:
            st.subheader("Login")
            username = st.text_input("Username", key="login_username")
            password = st.text_input("Password", type="password", key="login_password")
            
            if st.button("Login"):
                user_id = finance_manager.authenticate_user(username, password)
                if user_id:
                    st.session_state.user_id = user_id
                    st.experimental_rerun()
                else:
                    st.error("Invalid username or password")
        
        with tab2:
            st.subheader("Register")
            username = st.text_input("Username", key="register_username")
            password = st.text_input("Password", type="password", key="register_password")
            confirm_password = st.text_input("Confirm Password", type="password")
            
            if st.button("Register"):
                if password != confirm_password:
                    st.error("Passwords do not match")
                elif not username or not password:
                    st.error("Username and password are required")
                else:
                    success = finance_manager.register_user(username, password)
                    if success:
                        st.success("Registration successful. Please log in.")
                    else:
                        st.error("Username already exists")
        
        return
    
    # Main application (user is logged in)
    st.title("Smart Personal Finance Manager")
    
    if st.button("Logout", key="logout"):
        st.session_state.user_id = None
        st.experimental_rerun()
    
    tab1, tab2, tab3, tab4 = st.tabs(["Dashboard", "Transactions", "Budget", "Predictions"])
    
    with tab1:
        st.header("Dashboard")
        
        # Get visualizations
        visualizations = finance_manager.visualize_spending(st.session_state.user_id)
        
        if not visualizations:
            st.info("No data to display. Please add some transactions.")
        else:
            # Convert to DataFrames
            monthly_df = pd.DataFrame(visualizations['monthly_totals'])
            category_df = pd.DataFrame(visualizations['category_totals'])
            time_series_df = pd.DataFrame(visualizations['time_series'])
            
            # Monthly Income vs Expenses
            st.subheader("Monthly Income vs Expenses")
            monthly_pivot = monthly_df.pivot(index='month_year', columns='is_income', values='amount').reset_index()
            monthly_pivot.columns = ['month_year', 'expenses', 'income']
            monthly_pivot.fillna(0, inplace=True)
            
            fig1 = px.bar(monthly_pivot, x='month_year', y=['income', 'expenses'],
                          title="Monthly Income vs Expenses",
                          labels={'value': 'Amount', 'month_year': 'Month'},
                          barmode='group',
                          color_discrete_map={'income': 'green', 'expenses': 'red'})
            st.plotly_chart(fig1, use_container_width=True)
            
            # Expense categories
            st.subheader("Expense Categories")
            fig2 = px.pie(category_df, values='amount', names='category',
                          title="Expenses by Category")
            st.plotly_chart(fig2, use_container_width=True)
            
            # Display recent transactions
            transactions = finance_manager.get_transactions(st.session_state.user_id)
            recent_transactions = sorted(transactions, key=lambda x: x['date'], reverse=True)[:5]
            
            st.subheader("Recent Transactions")
            for tx in recent_transactions:
                tx_type = "Income" if tx['is_income'] else "Expense"
                amount = f"${tx['amount']:.2f}"
                st.write(f"**{tx['date']}** - {tx['category']} ({tx_type}): {amount} - {tx['description']}")
    
    with tab2:
        st.header("Manage Transactions")
        
        # Add new transaction
        st.subheader("Add New Transaction")
        col1, col2 = st.columns(2)
        
        with col1:
            date = st.date_input("Date", value=datetime.now())
            amount = st.number_input("Amount", min_value=0.01, format="%.2f")
            is_income = st.selectbox("Type", options=["Expense", "Income"]) == "Income"
        
        with col2:
            categories = ["Food", "Rent", "Transportation", "Entertainment", "Utilities", 
                         "Education", "Healthcare", "Clothing", "Salary", "Gift", "Other"]
            category = st.selectbox("Category", options=categories)
            description = st.text_input("Description")
        
        if st.button("Add Transaction"):
            finance_manager.add_transaction(
                st.session_state.user_id,
                date,
                amount,
                category,
                description,
                is_income
            )
            st.success("Transaction added successfully!")
            st.experimental_rerun()
        
        # View transactions
        st.subheader("Transaction History")
        transactions = finance_manager.get_transactions(st.session_state.user_id)
        
        if not transactions:
            st.info("No transactions found.")
        else:
            df = pd.DataFrame(transactions)
            df['date'] = pd.to_datetime(df['date'])
            df = df.sort_values('date', ascending=False)
            
            df['type'] = df['is_income'].apply(lambda x: "Income" if x else "Expense")
            df['amount'] = df['amount'].apply(lambda x: f"${x:.2f}")
            
            st.dataframe(df[['date', 'type', 'category', 'amount', 'description']])
    
    with tab3:
        st.header("Budget Management")
        
        # Set budget
        st.subheader("Set Budget")
        col1, col2 = st.columns(2)
        
        with col1:
            categories = ["Food", "Rent", "Transportation", "Entertainment", "Utilities", 
                         "Education", "Healthcare", "Clothing", "Other"]
            budget_category = st.selectbox("Category", options=categories)
            budget_amount = st.number_input("Monthly Budget Amount", min_value=0.01, format="%.2f")
        
        with col2:
            budget_period = st.selectbox("Period", options=["Monthly", "Yearly"])
        
        if st.button("Set Budget"):
            finance_manager.set_budget(
                st.session_state.user_id,
                budget_category,
                budget_amount,
                budget_period
            )
            st.success("Budget set successfully!")
            st.experimental_rerun()
        
        # View budgets
        st.subheader("Current Budgets")
        budgets = finance_manager.get_budgets(st.session_state.user_id)
        
        if not budgets:
            st.info("No budgets set.")
        else:
            budget_df = pd.DataFrame(budgets)
            
            # Get actual spending
            transactions = finance_manager.get_transactions(st.session_state.user_id)
            tx_df = pd.DataFrame(transactions)
            
            if not tx_df.empty:
                tx_df['date'] = pd.to_datetime(tx_df['date'])
                current_month = datetime.now().month
                current_year = datetime.now().year
                
                # Filter for current month and expenses
                current_month_expenses = tx_df[
                    (tx_df['date'].dt.month == current_month) & 
                    (tx_df['date'].dt.year == current_year) & 
                    (tx_df['is_income'] == 0)
                ]
                
                # Group by category
                if not current_month_expenses.empty:
                    category_spending = current_month_expenses.groupby('category')['amount'].sum().reset_index()
                    
                    # Merge with budgets
                    budget_df = pd.merge(
                        budget_df,
                        category_spending,
                        on='category',
                        how='left'
                    )
                    
                    budget_df['amount_spent'] = budget_df['amount_y'].fillna(0)
                    budget_df['budget'] = budget_df['amount_x']
                    budget_df['remaining'] = budget_df['budget'] - budget_df['amount_spent']
                    budget_df['percent_used'] = (budget_df['amount_spent'] / budget_df['budget'] * 100).round(2)
                    
                    # Create progress bars
                    for _, row in budget_df.iterrows():
                        st.write(f"**{row['category']}**: ${row['amount_spent']:.2f} of ${row['budget']:.2f} ({row['percent_used']}%)")
                        progress = min(100, row['percent_used']) / 100
                        st.progress(progress)
                else:
                    for _, row in budget_df.iterrows():
                        st.write(f"**{row['category']}**: $0.00 of ${row['amount']:.2f} (0%)")
                        st.progress(0)
            else:
                for _, row in budget_df.iterrows():
                    st.write(f"**{row['category']}**: $0.00 of ${row['amount']:.2f} (0%)")
                    st.progress(0)
    
    with tab4:
        st.header("Expense Predictions")
        
        # Generate predictions
        predictions = finance_manager.predict_expenses(st.session_state.user_id)
        
        if not predictions:
            st.info("Not enough data for predictions. Please add more transactions.")
        else:
            # Total monthly predictions
            st.subheader("Predicted Monthly Expenses")
            monthly_pred_df = pd.DataFrame(predictions['total_by_month'])
            monthly_pred_df['month_year'] = monthly_pred_df.apply(
                lambda x: f"{x['year']}-{x['month']:02d}", axis=1
            )
            
            fig3 = px.bar(
                monthly_pred_df, 
                x='month_year', 
                y='predicted_amount',
                title="Predicted Monthly Expenses",
                labels={'predicted_amount': 'Amount', 'month_year': 'Month'}
            )
            st.plotly_chart(fig3, use_container_width=True)
            
            # Category predictions
            st.subheader("Predicted Expenses by Category")
            category_pred_df = pd.DataFrame(predictions['by_category'])
            
            # Group by category
            category_totals = category_pred_df.groupby('category')['predicted_amount'].sum().reset_index()
            
            fig4 = px.pie(
                category_totals,
                values='predicted_amount',
                names='category',
                title="Predicted Expenses by Category"
            )
            st.plotly_chart(fig4, use_container_width=True)
            
            # Financial insights
            st.subheader("Financial Insights")
            
            # Calculate total predicted expenses
            total_predicted = monthly_pred_df['predicted_amount'].sum()
            
            # Get average monthly income
            transactions = finance_manager.get_transactions(st.session_state.user_id)
            tx_df = pd.DataFrame(transactions)
            
            if not tx_df.empty:
                tx_df['date'] = pd.to_datetime(tx_df['date'])
                income_df = tx_df[tx_df['is_income'] == 1]
                
                if not income_df.empty:
                    income_df['month_year'] = income_df['date'].dt.strftime('%Y-%m')
                    monthly_income = income_df.groupby('month_year')['amount'].sum()
                    avg_monthly_income = monthly_income.mean()
                    
                    # Calculate predicted savings
                    avg_monthly_expenses = total_predicted / len(monthly_pred_df)
                    predicted_savings = avg_monthly_income - avg_monthly_expenses
                    
                    st.write(f"**Average Monthly Income:** ${avg_monthly_income:.2f}")
                    st.write(f"**Predicted Average Monthly Expenses:** ${avg_monthly_expenses:.2f}")
                    
                    if predicted_savings > 0:
                        st.write(f"**Predicted Monthly Savings:** ${predicted_savings:.2f}")
                        st.write(f"**Projected Annual Savings:** ${predicted_savings * 12:.2f}")
                    else:
                        st.warning(f"**Predicted Monthly Deficit:** ${-predicted_savings:.2f}")
                        st.write("Consider reducing expenses in the following categories:")
                        
                        # Suggest categories to reduce
                        top_categories = category_totals.sort_values('predicted_amount', ascending=False).head(3)
                        for _, row in top_categories.iterrows():
                            st.write(f"- {row['category']}: ${row['predicted_amount']:.2f}")
                else:
                    st.info("No income data available for savings prediction.")
            else:
                st.info("No transaction data available for financial insights.")

if __name__ == "__main__":
    main()

Resources:

4. Sustainable Campus Smart Grid Simulation

A futuristic campus with a Sustainable Smart Grid Simulation. The image features solar panels, wind turbines, and AI-powered energy management systems. A digital dashboard displays real-time energy flow, efficiency analytics, and sustainability metrics. Students interact with holographic controls, optimizing renewable energy distribution. The background includes eco-friendly buildings, green rooftops, and a smart energy network.

Difficulty Level: Advanced

Skills Developed: IoT, Data Simulation, Algorithm Design, System Modeling

Project Description: Create a simulation of a smart grid system for a college campus that optimizes energy usage, incorporates renewable energy sources, and reduces overall energy costs. This project combines IoT concepts with algorithmic optimization to create a sustainable energy management system.

Key Features:

  • Energy consumption simulation
  • Renewable energy integration modeling
  • Peak load prediction and management
  • Cost optimization algorithms
  • Real-time monitoring dashboard
  • “What-if” scenario analysis

Implementation Steps:

  1. Research smart grid systems and sustainable energy concepts
  2. Design the simulation architecture
  3. Implement energy consumption models
  4. Develop optimization algorithms
  5. Create a visualization dashboard using Dash or Streamlit
  6. Test with different scenarios to validate the model

Sample Code Implementation:

# smart_grid_simulation.py
import numpy as np
import pandas as pd
import datetime
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.graph_objects as go
import plotly.express as px
from scipy.optimize import minimize
import json
import os

class SmartGridSimulation:
    def __init__(self, config_file=None):
        # Load configuration or use defaults
        if config_file and os.path.exists(config_file):
            with open(config_file, 'r') as f:
                self.config = json.load(f)
        else:
            self.config = {
                'campus': {
                    'buildings': [
                        {'name': 'Academic Building', 'type': 'academic', 'size': 10000, 'peak_load': 500},
                        {'name': 'Dormitory A', 'type': 'residential', 'size': 5000, 'peak_load': 300},
                        {'name': 'Dormitory B', 'type': 'residential', 'size': 5000, 'peak_load': 300},
                        {'name': 'Sports Complex', 'type': 'recreational', 'size': 8000, 'peak_load': 400},
                        {'name': 'Library', 'type': 'academic', 'size': 7000, 'peak_load': 350},
                        {'name': 'Admin Building', 'type': 'administrative', 'size': 3000, 'peak_load': 200},
                        {'name': 'Cafeteria', 'type': 'food_service', 'size': 2000, 'peak_load': 250},
                    ]
                },
                'energy_sources': {
                    'grid': {'max_capacity': 2000, 'cost_per_kwh': 0.12},
                    'solar': {'capacity': 500, 'installation_area': 2000, 'efficiency': 0.2, 'cost_per_kwh': 0.0},
                    'wind': {'capacity': 300, 'turbines': 3, 'efficiency': 0.35, 'cost_per_kwh': 0.0},
                    'battery_storage': {'capacity': 1000, 'max_charge_rate': 100, 'max_discharge_rate': 200, 'efficiency': 0.9}
                },
                'simulation': {
                    'time_step': 60,  # minutes
                    'duration': 7,    # days
                    'start_date': '2025-03-01'
                }
            }
        
        # Initialize simulation state
        self.current_time = pd.to_datetime(self.config['simulation']['start_date'])
        self.end_time = self.current_time + pd.Timedelta(days=self.config['simulation']['duration'])
        self.time_step = pd.Timedelta(minutes=self.config['simulation']['time_step'])
        
        # Battery state of charge (SOC) starts at 50%
        self.battery_soc = 0.5 * self.config['energy_sources']['battery_storage']['capacity']
        
        # Simulation results storage
        self.results = {
            'timestamp': [],
            'building_load': {},
            'total_load': [],
            'solar_generation': [],
            'wind_generation': [],
            'grid_consumption': [],
            'battery_charge': [],
            'battery_discharge': [],
            'battery_soc': [],
            'total_cost': [],
            'co2_emissions': []
        }
        
        # Initialize building loads
        for building in self.config['campus']['buildings']:
            self.results['building_load'][building['name']] = []
        
        # Weather data - for a real project, you might use an API or historical data
        self.generate_weather_data()
    
    def generate_weather_data(self):
        # Generate synthetic weather data for the simulation period
        timestamps = pd.date_range(
            start=self.current_time,
            end=self.end_time,
            freq=self.time_step
        )
        
        # Solar irradiance (W/m²) - follows daily pattern
        base_irradiance = np.zeros(len(timestamps))
        for i, ts in enumerate(timestamps):
            hour = ts.hour
            # Daylight hours (6 AM to 6 PM)
            if 6 <= hour < 18:
                # Peak at noon
                peak_factor = 1 - abs(hour - 12) / 6
                base_irradiance[i] = 1000 * peak_factor
        
        # Add some randomness for cloud cover
        cloud_factor = 0.7 + 0.3 * np.random.random(len(timestamps))
        solar_irradiance = base_irradiance * cloud_factor
        
        # Wind speed (m/s) - more random
        wind_base = 5 + 3 * np.sin(np.linspace(0, 4*np.pi, len(timestamps)))
        wind_random = 2 * np.random.random(len(timestamps))
        wind_speed = wind_base + wind_random
        
        # Temperature (°C) - follows daily pattern
        temp_base = np.zeros(len(timestamps))
        for i, ts in enumerate(timestamps):
            hour = ts.hour
            day_temp = 15 + 5 * np.sin(np.pi * (ts.day_of_year % 365) / 182.5)  # Seasonal variation
            hour_offset = 5 * np.sin(np.pi * (hour - 14) / 12)  # Daily variation (peak at 2 PM)
            temp_base[i] = day_temp + hour_offset
        
        temp_random = 2 * (np.random.random(len(timestamps)) - 0.5)
        temperature = temp_base + temp_random
        
        # Store weather data
        self.weather_data = pd.DataFrame({
            'timestamp': timestamps,
            'solar_irradiance': solar_irradiance,
            'wind_speed': wind_speed,
            'temperature': temperature
        })
    
    def calculate_building_load(self, building, timestamp):
        """Calculate the energy load for a building at a given time."""
        hour = timestamp.hour
        day_of_week = timestamp.dayofweek
        
        # Base load as percentage of peak load
        if building['type'] == 'academic':
            if day_of_week < 5:  # Weekday
                if 8 <= hour < 18:  # Working hours
                    base_load_pct = 0.7 + 0.3 * np.random.random()
                elif 18 <= hour < 22:  # Evening classes
                    base_load_pct = 0.4 + 0.2 * np.random.random()
                else:  # Night hours
                    base_load_pct = 0.2 + 0.1 * np.random.random()
            else:  # Weekend
                if 10 <= hour < 16:  # Weekend activity
                    base_load_pct = 0.3 + 0.2 * np.random.random()
                else:
                    base_load_pct = 0.1 + 0.1 * np.random.random()
        
        elif building['type'] == 'residential':
            if 7 <= hour < 9 or 17 <= hour < 23:  # Morning/Evening peak
                base_load_pct = 0.8 + 0.2 * np.random.random()
            elif 23 <= hour or hour < 7:  # Night
                base_load_pct = 0.4 + 0.1 * np.random.random()
            else:  # Day
                base_load_pct = 0.3 + 0.2 * np.random.random()
        
        elif building['type'] == 'recreational':
            if day_of_week < 5:  # Weekday
                if 16 <= hour < 21:  # After classes
                    base_load_pct = 0.7 + 0.3 * np.random.random()
                elif 9 <= hour < 16:  # During day
                    base_load_pct = 0.4 + 0.2 * np.random.random()
                else:
                    base_load_pct = 0.1 + 0.1 * np.random.random()
            else:  # Weekend
                if 10 <= hour < 20:  # Weekend activity
                    base_load_pct = 0.8 + 0.2 * np.random.random()
                else:
                    base_load_pct = 0.1 + 0.1 * np.random.random()
        
        elif building['type'] == 'food_service':
            if 7 <= hour < 10 or 11 <= hour < 14 or 17 <= hour < 20:  # Meal times
                base_load_pct = 0.9 + 0.1 * np.random.random()
            elif 6 <= hour < 7 or 10 <= hour < 11 or 14 <= hour < 17 or 20 <= hour < 21:  # Prep times
                base_load_pct = 0.6 + 0.2 * np.random.random()
            else:
                base_load_pct = 0.2 + 0.1 * np.random.random()
        
        else:  # Administrative and others
            if day_of_week < 5:  # Weekday
                if 8 <= hour < 17:  # Working hours
                    base_load_pct = 0.7 + 0.2 * np.random.random()
                else:
                    base_load_pct = 0.1 + 0.1 * np.random.random()
            else:  # Weekend
                base_load_pct = 0.1 + 0.1 * np.random.random()
        
        # Adjust for temperature effect on HVAC
        weather_at_time = self.weather_data[self.weather_data['timestamp'] == timestamp]
        if not weather_at_time.empty:
            temp = weather_at_time.iloc[0]['temperature']
            # HVAC load increases as temperature deviates from comfort zone (20-22°C)
            temp_effect = 0.1 * max(0, abs(temp - 21) - 2) / 10
            base_load_pct += temp_effect
        
        return building['peak_load'] * min(1.0, base_load_pct)
    
    def calculate_renewable_generation(self, timestamp):
        """Calculate renewable energy generation based on weather conditions."""
        weather_at_time = self.weather_data[self.weather_data['timestamp'] == timestamp]
        
        if weather_at_time.empty:
            return 0, 0
        
        weather = weather_at_time.iloc[0]
        
        # Solar generation
        solar_config = self.config['energy_sources']['solar']
        solar_irradiance = weather['solar_irradiance']  # W/m²
        solar_generation = (
            solar_irradiance * solar_config['installation_area'] * 
            solar_config['efficiency'] / 1000  # Convert to kW
        )
        
        # Wind generation
        wind_config = self.config['energy_sources']['wind']
        wind_speed = weather['wind_speed']  # m/s
        
        # Simple wind turbine model
        # No generation below cut-in speed (3 m/s) or above cut-out speed (25 m/s)
        if wind_speed < 3 or wind_speed > 25:
            wind_generation = 0
        else:
            # Simplified power curve
            if wind_speed < 12:
                # Ramp up from cut-in to rated speed
                power_factor = (wind_speed - 3) / 9
            else:
                # Rated power from rated speed to cut-out
                power_factor = 1.0
            
            wind_generation = (
                wind_config['capacity'] * wind_config['turbines'] * 
                power_factor * wind_config['efficiency']
            )
        
        return solar_generation, wind_generation
    
    def optimize_energy_dispatch(self, total_load, solar_gen, wind_gen):
        """Optimize the energy dispatch between different sources."""
        renewable_gen = solar_gen + wind_gen
        battery_config = self.config['energy_sources']['battery_storage']
        
        # If renewables exceed load, charge battery with excess
        if renewable_gen >= total_load:
            excess_energy = renewable_gen - total_load
            battery_charge = min(
                excess_energy,
                battery_config['max_charge_rate'],
                battery_config['capacity'] - self.battery_soc
            )
            battery_discharge = 0
            grid_consumption = 0
        else:
            # Need additional power
            energy_deficit = total_load - renewable_gen
            
            # Optimize between grid and battery discharge
            # Simple strategy: Use battery first if SOC is good, then grid
            if self.battery_soc > 0.2 * battery_config['capacity']:
                battery_discharge = min(
                    energy_deficit,
                    battery_config['max_discharge_rate'],
                    self.battery_soc
                )
                grid_consumption = energy_deficit - battery_discharge
            else:
                # Battery SOC too low, use grid instead
                battery_discharge = 0
                grid_consumption = energy_deficit
            
            battery_charge = 0
        
        # Update battery SOC
        self.battery_soc = (
            self.battery_soc + 
            battery_charge * battery_config['efficiency'] - 
            battery_discharge / battery_config['efficiency']
        )
        
        # Calculate cost
        grid_cost = grid_consumption * self.config['energy_sources']['grid']['cost_per_kwh']
        
        # Calculate CO2 emissions (typical grid emissions factor: 0.5 kg CO2/kWh)
        co2_emissions = grid_consumption * 0.5  # kg CO2
        
        return {
            'grid_consumption': grid_consumption,
            'battery_charge': battery_charge,
            'battery_discharge': battery_discharge,
            'battery_soc': self.battery_soc,
            'cost': grid_cost,
            'co2_emissions': co2_emissions
        }
    
    def run_simulation_step(self):
        """Run a single step of the simulation."""
        # Calculate building loads
        building_loads = {}
        for building in self.config['campus']['buildings']:
            load = self.calculate_building_load(building, self.current_time)
            building_loads[building['name']] = load
        
        total_load = sum(building_loads.values())
        
        # Calculate renewable generation
        solar_gen, wind_gen = self.calculate_renewable_generation(self.current_time)
        
        # Optimize energy dispatch
        dispatch = self.optimize_energy_dispatch(total_load, solar_gen, wind_gen)
        
        # Store results
        self.results['timestamp'].append(self.current_time)
        for building_name, load in building_loads.items():
            self.results['building_load'][building_name].append(load)
        
        self.results['total_load'].append(total_load)
        self.results['solar_generation'].append(solar_gen)
        self.results['wind_generation'].append(wind_gen)
        self.results['grid_consumption'].append(dispatch['grid_consumption'])
        self.results['battery_charge'].append(dispatch['battery_charge'])
        self.results['battery_discharge'].append(dispatch['battery_discharge'])
        self.results['battery_soc'].append(dispatch['battery_soc'])
        self.results['total_cost'].append(dispatch['cost'])
        self.results['co2_emissions'].append(dispatch['co2_emissions'])
        
        # Advance time
        self.current_time += self.time_step
    
    def run_simulation(self):
        """Run the complete simulation."""
        while self.current_time <= self.end_time:
            self.run_simulation_step()
    
    def get_results_df(self):
        """Convert results to a pandas DataFrame."""
        df = pd.DataFrame({
            'timestamp': self.results['timestamp'],
            'total_load': self.results['total_load'],
            'solar_generation': self.results['solar_generation'],
            'wind_generation': self.results['wind_generation'],
            'grid_consumption': self.results['grid_consumption'],
            'battery_charge': self.results['battery_charge'],
            'battery_discharge': self.results['battery_discharge'],
            'battery_soc': self.results['battery_soc'],
            'total_cost': self.results['total_cost'],
            'co2_emissions': self.results['co2_emissions']
        })
        
        # Add building loads
        for building_name, loads in self.results['building_load'].items():
            df[f'load_{building_name}'] = loads
        
        return df
    
    def run_scenario(self, scenario_config):
        """Run a what-if scenario with modified parameters."""
        # Create a copy of the current configuration
        original_config = self.config.copy()
        
        # Update with scenario parameters
        for category, params in scenario_config.items():
            if category in self.config:
                if isinstance(self.config[category], dict):
                    for key, value in params.items():
                        if key in self.config[category]:
                            self.config[category][key] = value
        
        # Re-initialize and run simulation
        self.__init__(config_file=None)  # Reset with new config
        self.run_simulation()
        
        # Get results
        results = self.get_results_df()
        
        # Restore original configuration
        self.config = original_config
        
        return results

# Dashboard Application
def create_dashboard(simulation):
    app = dash.Dash(__name__, title="Campus Smart Grid Simulation")
    
    # Run initial simulation
    simulation.run_simulation()
    results_df = simulation.get_results_df()
    
    # Get building names for dropdown
    building_names = [building['name'] for building in simulation.config['campus']['buildings']]
    
    app.layout = html.Div([
        html.H1("Sustainable Campus Smart Grid Simulation", 
                style={'textAlign': 'center', 'color': '#2a3f5f', 'marginBottom': 20}),
        
        html.Div([
            html.Div([
                html.H3("Simulation Controls", style={'color': '#2a3f5f'}),
                html.Label("Simulation Duration (days):"),
                dcc.Slider(
                    id='duration-slider',
                    min=1,
                    max=30,
                    value=simulation.config['simulation']['duration'],
                    marks={i: f'{i}' for i in range(0, 31, 5)},
                    step=1
                ),
                
                html.Label("Solar Capacity (kW):"),
                dcc.Slider(
                    id='solar-slider',
                    min=0,
                    max=1000,
                    value=simulation.config['energy_sources']['solar']['capacity'],
                    marks={i: f'{i}' for i in range(0, 1001, 200)},
                    step=50
                ),
                
                html.Label("Wind Capacity per Turbine (kW):"),
                dcc.Slider(
                    id='wind-slider',
                    min=0,
                    max=300,
                    value=simulation.config['energy_sources']['wind']['capacity'],
                    marks={i: f'{i}' for i in range(0, 301, 50)},
                    step=10
                ),
                
                html.Label("Number of Wind Turbines:"),
                dcc.Slider(
                    id='turbines-slider',
                    min=0,
                    max=10,
                    value=simulation.config['energy_sources']['wind']['turbines'],
                    marks={i: f'{i}' for i in range(0, 11)},
                    step=1
                ),
                
                html.Label("Battery Storage Capacity (kWh):"),
                dcc.Slider(
                    id='battery-slider',
                    min=0,
                    max=2000,
                    value=simulation.config['energy_sources']['battery_storage']['capacity'],
                    marks={i: f'{i}' for i in range(0, 2001, 500)},
                    step=100
                ),
                
                html.Button('Run Simulation', id='run-button', 
                           style={'backgroundColor': '#4CAF50', 'color': 'white', 
                                  'padding': '10px 20px', 'margin': '20px 0',
                                  'border': 'none', 'borderRadius': '4px', 
                                  'cursor': 'pointer'}),
            ], style={'width': '30%', 'display': 'inline-block', 'padding': '20px', 
                      'backgroundColor': '#f8f9fa', 'borderRadius': '10px',
                      'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'}),
            
            html.Div([
                html.H3("Simulation Results", style={'color': '#2a3f5f'}),
                dcc.Tabs([
                    dcc.Tab(label="Energy Overview", children=[
                        dcc.Graph(id='energy-overview-graph')
                    ]),
                    dcc.Tab(label="Building Loads", children=[
                        html.Label("Select Building:"),
                        dcc.Dropdown(
                            id='building-dropdown',
                            options=[{'label': name, 'value': name} for name in building_names],
                            value=building_names[0]
                        ),
                        dcc.Graph(id='building-load-graph')
                    ]),
                    dcc.Tab(label="Battery State", children=[
                        dcc.Graph(id='battery-graph')
                    ]),
                    dcc.Tab(label="Cost & Emissions", children=[
                        dcc.Graph(id='cost-emissions-graph')
                    ])
                ])
            ], style={'width': '65%', 'display': 'inline-block', 'float': 'right', 
                      'padding': '20px', 'backgroundColor': '#f8f9fa', 
                      'borderRadius': '10px', 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'})
        ]),
        
        html.Div([
            html.H3("Summary Statistics", style={'color': '#2a3f5f', 'marginTop': '30px'}),
            html.Div(id='summary-stats', style={'padding': '15px', 'backgroundColor': '#e9ecef', 
                                               'borderRadius': '5px', 'marginTop': '10px'})
        ], style={'marginTop': '30px', 'clear': 'both', 'padding': '20px', 
                  'backgroundColor': '#f8f9fa', 'borderRadius': '10px',
                  'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'})
    ], style={'maxWidth': '1200px', 'margin': '0 auto', 'padding': '20px'})
    
    @app.callback(
        [Output('energy-overview-graph', 'figure'),
         Output('building-load-graph', 'figure'),
         Output('battery-graph', 'figure'),
         Output('cost-emissions-graph', 'figure'),
         Output('summary-stats', 'children')],
        [Input('run-button', 'n_clicks'),
         Input('building-dropdown', 'value')],
        [State('duration-slider', 'value'),
         State('solar-slider', 'value'),
         State('wind-slider', 'value'),
         State('turbines-slider', 'value'),
         State('battery-slider', 'value')]
    )
    def update_graphs(n_clicks, selected_building, duration, solar_capacity, 
                     wind_capacity, num_turbines, battery_capacity):
        nonlocal results_df
        
        # If run button was clicked, update simulation parameters and rerun
        if n_clicks is not None:
            scenario_config = {
                'simulation': {
                    'duration': duration
                },
                'energy_sources': {
                    'solar': {
                        'capacity': solar_capacity
                    },
                    'wind': {
                        'capacity': wind_capacity,
                        'turbines': num_turbines
                    },
                    'battery_storage': {
                        'capacity': battery_capacity
                    }
                }
            }
            
            results_df = simulation.run_scenario(scenario_config)
        
        # Energy Overview Graph
        energy_fig = go.Figure()
        
        energy_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['total_load'],
            name='Total Load', line=dict(color='red', width=2)
        ))
        
        energy_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['solar_generation'],
            name='Solar Generation', line=dict(color='orange', width=2)
        ))
        
        energy_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['wind_generation'],
            name='Wind Generation', line=dict(color='blue', width=2)
        ))
        
        energy_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['grid_consumption'],
            name='Grid Consumption', line=dict(color='purple', width=2)
        ))
        
        energy_fig.update_layout(
            title='Energy Overview',
            xaxis_title='Time',
            yaxis_title='Power (kW)',
            legend=dict(orientation='h', y=1.1),
            height=400
        )
        
        # Building Load Graph
        building_fig = go.Figure()
        
        building_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df[f'load_{selected_building}'],
            name=f'{selected_building} Load', line=dict(color='green', width=2)
        ))
        
        building_fig.update_layout(
            title=f'{selected_building} Energy Consumption',
            xaxis_title='Time',
            yaxis_title='Power (kW)',
            height=400
        )
        
        # Battery Graph
        battery_fig = go.Figure()
        
        battery_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['battery_soc'],
            name='Battery State of Charge', line=dict(color='blue', width=2)
        ))
        
        battery_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['battery_charge'],
            name='Battery Charging', line=dict(color='green', width=2)
        ))
        
        battery_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['battery_discharge'],
            name='Battery Discharging', line=dict(color='red', width=2)
        ))
        
        battery_fig.update_layout(
            title='Battery Operation',
            xaxis_title='Time',
            yaxis_title='Power (kW) / Energy (kWh)',
            legend=dict(orientation='h', y=1.1),
            height=400
        )
        
        # Cost & Emissions Graph
        cost_emissions_fig = go.Figure()
        
        cost_emissions_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['total_cost'],
            name='Energy Cost', line=dict(color='green', width=2),
            yaxis='y'
        ))
        
        cost_emissions_fig.add_trace(go.Scatter(
            x=results_df['timestamp'], y=results_df['co2_emissions'],
            name='CO2 Emissions', line=dict(color='gray', width=2),
            yaxis='y2'
        ))
        
        cost_emissions_fig.update_layout(
            title='Cost and Emissions',
            xaxis_title='Time',
            yaxis=dict(title='Cost ($)', side='left', showgrid=False),
            yaxis2=dict(title='CO2 Emissions (kg)', side='right', overlaying='y', showgrid=False),
            legend=dict(orientation='h', y=1.1),
            height=400
        )
        
        # Summary Statistics
        total_energy_demand = results_df['total_load'].sum() * simulation.time_step.total_seconds() / 3600  # kWh
        total_solar = results_df['solar_generation'].sum() * simulation.time_step.total_seconds() / 3600  # kWh
        total_wind = results_df['wind_generation'].sum() * simulation.time_step.total_seconds() / 3600  # kWh
        total_grid = results_df['grid_consumption'].sum() * simulation.time_step.total_seconds() / 3600  # kWh
        total_cost = results_df['total_cost'].sum()
        total_emissions = results_df['co2_emissions'].sum()
        
        renewable_percentage = ((total_solar + total_wind) / total_energy_demand) * 100 if total_energy_demand > 0 else 0
        
        summary_stats = html.Div([
            html.Div([
                html.Div([
                    html.H4("Energy Demand"),
                    html.P(f"{total_energy_demand:.2f} kWh")
                ], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
                
                html.Div([
                    html.H4("Renewable Energy"),
                    html.P(f"{total_solar + total_wind:.2f} kWh ({renewable_percentage:.1f}%)")
                ], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
                
                html.Div([
                    html.H4("Grid Energy"),
                    html.P(f"{total_grid:.2f} kWh")
                ], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
                
                html.Div([
                    html.H4("Total Cost"),
                    html.P(f"${total_cost:.2f}")
                ], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'}),
                
                html.Div([
                    html.H4("CO2 Emissions"),
                    html.P(f"{total_emissions:.2f} kg")
                ], style={'width': '20%', 'display': 'inline-block', 'textAlign': 'center'})
            ])
        ])
        
        return energy_fig, building_fig, battery_fig, cost_emissions_fig, summary_stats
    
    return app

if __name__ == "__main__":
    # Create simulation
    simulation = SmartGridSimulation()
    
    # Create and run dashboard
    app = create_dashboard(simulation)
    app.run_server(debug=True)

Resources:

5. Augmented Reality Campus Navigation System

A futuristic university campus with an Augmented Reality (AR) navigation system. Students use AR glasses and smartphones to view interactive campus maps, real-time directions, and building information displayed as holograms. The scene includes digital signposts, floating icons for classrooms, libraries, and cafeterias, and a tech-enhanced environment with smart pathways. The background features a modern campus with high-tech infrastructure.

Difficulty Level: Advanced

Skills Developed: Computer Vision, AR Development, Geospatial Programming, Mobile Integration

Project Description: Build an augmented reality application that helps students navigate their college campus. This system overlays directional information, points of interest, and real-time data (like crowdedness of study spaces) onto a live camera view, creating an intuitive navigation experience.

Key Features:

  • AR-based navigation with directional arrows
  • Building and room identification
  • Shortest path algorithms
  • Points of interest with information overlays
  • Real-time updates on available facilities
  • Indoor positioning system

Implementation Steps:

  1. Set up the AR development environment (ARCore or ARKit integration)
  2. Create the campus map database with GPS coordinates
  3. Implement computer vision for landmark recognition
  4. Develop pathfinding algorithms
  5. Build the AR interface with information overlays
  6. Create a backend server for real-time data

Sample Code Implementation:

# campus_ar_navigation.py
from flask import Flask, request, jsonify
import numpy as np
import cv2
import json
import networkx as nx
import math
import sqlite3
import datetime
import os
from geopy.distance import geodesic

app = Flask(__name__)

# Database setup
def init_db():
    conn = sqlite3.connect('campus_navigation.db')
    cursor = conn.cursor()
    
    # Create buildings table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS buildings (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        description TEXT,
        latitude REAL NOT NULL,
        longitude REAL NOT NULL,
        altitude REAL,
        image_path TEXT,
        building_type TEXT,
        floors INTEGER DEFAULT 1
    )
    ''')
    
    # Create rooms table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS rooms (
        id INTEGER PRIMARY KEY,
        building_id INTEGER,
        room_number TEXT NOT NULL,
        name TEXT,
        floor INTEGER,
        latitude REAL,
        longitude REAL,
        capacity INTEGER,
        room_type TEXT,
        FOREIGN KEY (building_id) REFERENCES buildings (id)
    )
    ''')
    
    # Create points of interest table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS points_of_interest (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        description TEXT,
        poi_type TEXT,
        latitude REAL NOT NULL,
        longitude REAL NOT NULL,
        altitude REAL,
        building_id INTEGER,
        image_path TEXT,
        FOREIGN KEY (building_id) REFERENCES buildings (id)
    )
    ''')
    
    # Create paths table for navigation
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS paths (
        id INTEGER PRIMARY KEY,
        start_point_id INTEGER NOT NULL,
        end_point_id INTEGER NOT NULL,
        path_type TEXT,
        distance REAL,
        indoor BOOLEAN,
        accessible BOOLEAN DEFAULT 1
    )
    ''')
    
    # Create facility status table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS facility_status (
        id INTEGER PRIMARY KEY,
        facility_id INTEGER NOT NULL,
        status TEXT,
        occupancy_count INTEGER,
        last_updated TIMESTAMP,
        FOREIGN KEY (facility_id) REFERENCES points_of_interest (id)
    )
    ''')
    
    conn.commit()
    conn.close()

# Initialize database
init_db()

# Load sample data for testing
def load_sample_data():
    conn = sqlite3.connect('campus_navigation.db')
    cursor = conn.cursor()
    
    # Check if data already exists
    cursor.execute("SELECT COUNT(*) FROM buildings")
    if cursor.fetchone()[0] > 0:
        conn.close()
        return
    
    # Sample buildings
    buildings = [
        (1, "Main Academic Building", "Central academic building with classrooms and offices", 40.7128, -74.0060, 10, "main_academic.jpg", "academic", 4),
        (2, "Science Center", "Houses labs and research facilities", 40.7130, -74.0065, 8, "science_center.jpg", "academic", 3),
        (3, "Student Union", "Student services and recreation", 40.7125, -74.0055, 5, "student_union.jpg", "services", 2),
        (4, "Library", "Main campus library", 40.7127, -74.0050, 12, "library.jpg", "academic", 5),
        (5, "Dormitory A", "Undergraduate housing", 40.7135, -74.0070, 15, "dorm_a.jpg", "residential", 8),
        (6, "Sports Complex", "Athletic facilities and gym", 40.7120, -74.0080, 3, "sports.jpg", "recreational", 2)
    ]
    
    cursor.executemany('''
    INSERT INTO buildings (id, name, description, latitude, longitude, altitude, image_path, building_type, floors)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', buildings)
    
    # Sample rooms
    rooms = [
        (1, 1, "101", "Lecture Hall", 1, 40.7128, -74.0060, 120, "classroom"),
        (2, 1, "201", "Computer Lab", 2, 40.7128, -74.0060, 30, "lab"),
        (3, 2, "105", "Chemistry Lab", 1, 40.7130, -74.0065, 25, "lab"),
        (4, 3, "150", "Cafeteria", 1, 40.7125, -74.0055, 200, "dining"),
        (5, 4, "220", "Study Area", 2, 40.7127, -74.0050, 50, "study"),
        (6, 4, "320", "Quiet Zone", 3, 40.7127, -74.0050, 30, "study")
    ]
    
    cursor.executemany('''
    INSERT INTO rooms (id, building_id, room_number, name, floor, latitude, longitude, capacity, room_type)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', rooms)
    
    # Sample points of interest
    pois = [
        (1, "Main Entrance", "Primary entrance to campus", "entrance", 40.7133, -74.0062, 2, None, "main_entrance.jpg"),
        (2, "Coffee Shop", "Campus coffee shop", "food", 40.7126, -74.0059, 5, 3, "coffee_shop.jpg"),
        (3, "Campus Shuttle Stop", "Shuttle service stop", "transportation", 40.7129, -74.0070, 2, None, "shuttle_stop.jpg"),
        (4, "Info Kiosk", "Information booth", "information", 40.7130, -74.0061, 3, None, "info_kiosk.jpg"),
        (5, "ATM", "Campus ATM", "service", 40.7125, -74.0058, 4, 3, "atm.jpg"),
        (6, "Bicycle Rack", "Bicycle parking", "transportation", 40.7127, -74.0063, 2, None, "bike_rack.jpg")
    ]
    
    cursor.executemany('''
    INSERT INTO points_of_interest (id, name, description, poi_type, latitude, longitude, altitude, building_id, image_path)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', pois)
    
    # Sample paths
    paths = [
        (1, 1, 2, "walkway", 100.5, False, True),
        (2, 1, 3, "walkway", 150.2, False, True),
        (3, 2, 3, "walkway", 120.8, False, True),
        (4, 2, 4, "walkway", 200.3, False, True),
        (5, 3, 4, "walkway", 180.1, False, True),
        (6, 3, 5, "walkway", 250.7, False, True),
        (7, 4, 5, "walkway", 220.4, False, True),
        (8, 5, 6, "walkway", 300.9, False, True)
    ]
    
    cursor.executemany('''
    INSERT INTO paths (id, start_point_id, end_point_id, path_type, distance, indoor, accessible)
    VALUES (?, ?, ?, ?, ?, ?, ?)
    ''', paths)
    
    # Sample facility status
    facility_statuses = [
        (1, 2, "Open", 15, datetime.datetime.now()),
        (2, 3, "Active", 0, datetime.datetime.now()),
        (3, 5, "Available", 0, datetime.datetime.now()),
        (4, 6, "Available", 12, datetime.datetime.now())
    ]
    
    cursor.executemany('''
    INSERT INTO facility_status (id, facility_id, status, occupancy_count, last_updated)
    VALUES (?, ?, ?, ?, ?)
    ''', facility_statuses)
    
    conn.commit()
    conn.close()

# Load sample data
load_sample_data()

# Computer Vision Functions
class LandmarkRecognizer:
    def __init__(self, model_path='landmark_model.h5', landmarks_db='landmarks.json'):
        # In a real implementation, load a trained model
        # For this example, we'll simulate recognition with a simplified approach
        self.landmarks = self._load_landmarks(landmarks_db)
    
    def _load_landmarks(self, landmarks_db):
        # Load landmark data from JSON file or use hardcoded data for example
        if os.path.exists(landmarks_db):
            with open(landmarks_db, 'r') as f:
                return json.load(f)
        else:
            # Sample landmark data for demonstration
            return {
                "building_1": {
                    "features": [0.2, 0.3, 0.1, 0.5, 0.7],  # Simplified feature representation
                    "building_id": 1
                },
                "building_2": {
                    "features": [0.8, 0.2, 0.6, 0.3, 0.1],
                    "building_id": 2
                },
                # Add more landmarks...
            }
    
    def recognize(self, image):
        """
        Recognize landmarks in an image.
        In a real implementation, this would use a trained model.
        For this example, we'll return a random landmark with confidence.
        """
        # Convert image to numpy array if it's not already
        if isinstance(image, str):
            image = cv2.imread(image)
        
        # In a real implementation, process the image and extract features
        # For this example, we'll simulate recognition
        
        # Random landmark with confidence score
        landmarks = list(self.landmarks.keys())
        if not landmarks:
            return None, 0.0
        
        landmark_id = np.random.choice(landmarks)
        confidence = np.random.uniform(0.5, 0.95)
        
        return self.landmarks[landmark_id], confidence

# Navigation System
class CampusNavigator:
    def __init__(self, db_path='campus_navigation.db'):
        self.db_path = db_path
        self.graph = self._build_navigation_graph()
    
    def _build_navigation_graph(self):
        """Build a navigation graph from the paths database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Create a graph
        G = nx.Graph()
        
        # Add nodes (buildings, POIs)
        cursor.execute("SELECT id, latitude, longitude FROM buildings")
        for building_id, lat, lon in cursor.fetchall():
            G.add_node(f"b_{building_id}", pos=(lat, lon), type="building")
        
        cursor.execute("SELECT id, latitude, longitude FROM points_of_interest")
        for poi_id, lat, lon in cursor.fetchall():
            G.add_node(f"p_{poi_id}", pos=(lat, lon), type="poi")
        
        # Add edges (paths)
        cursor.execute("""
        SELECT p.id, p.start_point_id, p.end_point_id, p.distance, p.accessible
        FROM paths p
        """)
        
        for path_id, start_id, end_id, distance, accessible in cursor.fetchall():
            # Determine node types and convert IDs
            start_type = "b" if start_id <= 6 else "p"  # Simplified determination for demo
            end_type = "b" if end_id <= 6 else "p"
            
            start_node = f"{start_type}_{start_id}"
            end_node = f"{end_type}_{end_id}"
            
            if accessible:
                G.add_edge(start_node, end_node, weight=distance, path_id=path_id)
        
        conn.close()
        return G
    
    def find_shortest_path(self, start_coords, end_coords):
        """
        Find the shortest path between two coordinates.
        
        Parameters:
        start_coords (tuple): (latitude, longitude) of starting point
        end_coords (tuple): (latitude, longitude) of destination
        
        Returns:
        list: List of coordinates forming the path
        """
        # Find nearest nodes to coordinates
        start_node = self._find_nearest_node(start_coords)
        end_node = self._find_nearest_node(end_coords)
        
        if not start_node or not end_node:
            return None
        
        try:
            # Find shortest path
            path = nx.shortest_path(self.graph, source=start_node, target=end_node, weight='weight')
            
            # Convert path to coordinates
            path_coords = []
            for node in path:
                node_data = self.graph.nodes[node]
                path_coords.append(node_data['pos'])
            
            return path_coords
        except nx.NetworkXNoPath:
            return None
    
    def _find_nearest_node(self, coords):
        """Find the nearest node to the given coordinates."""
        min_dist = float('inf')
        nearest_node = None
        
        for node, data in self.graph.nodes(data=True):
            node_coords = data.get('pos')
            if node_coords:
                dist = geodesic(coords, node_coords).meters
                if dist < min_dist:
                    min_dist = dist
                    nearest_node = node
        
        return nearest_node
    
    def get_nearby_pois(self, coords, radius=200):
        """Get points of interest within a certain radius (meters)."""
        nearby = []
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
                # Get all POIs
        cursor.execute("""
        SELECT id, name, description, poi_type, latitude, longitude 
        FROM points_of_interest
        """)
        
        for poi_id, name, desc, poi_type, lat, lon in cursor.fetchall():
            poi_coords = (lat, lon)
            dist = geodesic(coords, poi_coords).meters
            
            if dist <= radius:
                # Get current status
                cursor.execute("""
                SELECT status, occupancy_count, last_updated 
                FROM facility_status 
                WHERE facility_id = ?
                """, (poi_id,))
                
                status_row = cursor.fetchone()
                status = {
                    "status": status_row[0] if status_row else "Unknown",
                    "occupancy": status_row[1] if status_row else None,
                    "last_updated": status_row[2] if status_row else None
                } if status_row else None
                
                nearby.append({
                    "id": poi_id,
                    "name": name,
                    "description": desc,
                    "type": poi_type,
                    "coordinates": poi_coords,
                    "distance": dist,
                    "status": status
                })
        
        conn.close()
        return sorted(nearby, key=lambda x: x["distance"])
    
    def get_building_info(self, building_id):
        """Get detailed information about a building."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
        SELECT name, description, latitude, longitude, altitude, building_type, floors
        FROM buildings
        WHERE id = ?
        """, (building_id,))
        
        building_data = cursor.fetchone()
        
        if not building_data:
            conn.close()
            return None
        
        name, desc, lat, lon, alt, b_type, floors = building_data
        
        # Get rooms in the building
        cursor.execute("""
        SELECT id, room_number, name, floor, room_type, capacity
        FROM rooms
        WHERE building_id = ?
        """, (building_id,))
        
        rooms = []
        for room_id, room_num, room_name, floor, room_type, capacity in cursor.fetchall():
            rooms.append({
                "id": room_id,
                "number": room_num,
                "name": room_name,
                "floor": floor,
                "type": room_type,
                "capacity": capacity
            })
        
        # Get POIs in the building
        cursor.execute("""
        SELECT id, name, description, poi_type
        FROM points_of_interest
        WHERE building_id = ?
        """, (building_id,))
        
        pois = []
        for poi_id, poi_name, poi_desc, poi_type in cursor.fetchall():
            pois.append({
                "id": poi_id,
                "name": poi_name,
                "description": poi_desc,
                "type": poi_type
            })
        
        conn.close()
        
        return {
            "id": building_id,
            "name": name,
            "description": desc,
            "coordinates": (lat, lon, alt),
            "type": b_type,
            "floors": floors,
            "rooms": rooms,
            "points_of_interest": pois
        }

# Flask API Endpoints
navigator = CampusNavigator()
landmark_recognizer = LandmarkRecognizer()

@app.route('/api/navigate', methods=['POST'])
def navigate():
    data = request.json
    
    if not data or 'start' not in data or 'destination' not in data:
        return jsonify({"error": "Invalid request parameters"}), 400
    
    try:
        start_coords = (data['start']['latitude'], data['start']['longitude'])
        dest_coords = (data['destination']['latitude'], data['destination']['longitude'])
        
        path = navigator.find_shortest_path(start_coords, dest_coords)
        
        if not path:
            return jsonify({"error": "No path found"}), 404
        
        # Format path for response
        path_points = []
        for i, (lat, lon) in enumerate(path):
            point = {
                "latitude": lat,
                "longitude": lon,
                "step": i + 1
            }
            path_points.append(point)
        
        response = {
            "path": path_points,
            "distance": sum(geodesic(path[i], path[i+1]).meters for i in range(len(path)-1)),
            "estimated_time": sum(geodesic(path[i], path[i+1]).meters for i in range(len(path)-1)) / 1.4 / 60  # Assuming 1.4 m/s walking speed
        }
        
        return jsonify(response)
    
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/recognize', methods=['POST'])
def recognize_landmark():
    if 'image' not in request.files:
        return jsonify({"error": "No image provided"}), 400
    
    image_file = request.files['image']
    
    try:
        # Read image
        img_array = np.frombuffer(image_file.read(), np.uint8)
        img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
        
        # Recognize landmark
        landmark, confidence = landmark_recognizer.recognize(img)
        
        if not landmark or confidence < 0.6:  # Threshold
            return jsonify({"recognized": False, "message": "No landmark recognized with confidence"})
        
        # Get building information
        building_id = landmark.get("building_id")
        building_info = navigator.get_building_info(building_id)
        
        if not building_info:
            return jsonify({"recognized": True, "confidence": confidence, "message": "Building information not available"})
        
        return jsonify({
            "recognized": True,
            "confidence": confidence,
            "building": building_info
        })
    
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/nearby', methods=['GET'])
def nearby_pois():
    try:
        lat = float(request.args.get('latitude'))
        lon = float(request.args.get('longitude'))
        radius = float(request.args.get('radius', 200))  # Default 200m
        
        nearby = navigator.get_nearby_pois((lat, lon), radius)
        
        return jsonify({"nearby_points": nearby})
    
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/building/<int:building_id>', methods=['GET'])
def building_info(building_id):
    try:
        info = navigator.get_building_info(building_id)
        
        if not info:
            return jsonify({"error": "Building not found"}), 404
        
        return jsonify({"building": info})
    
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Mobile AR Application Code (Android with ARCore - Java/Kotlin interface)
# This would normally be in a separate project, but here's a conceptual implementation
"""
import android.os.Bundle
import android.widget.Toast
import androidx.appcompat.app.AppCompatActivity
import com.google.ar.core.*
import com.google.ar.core.exceptions.*
import java.util.concurrent.TimeUnit
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import okhttp3.OkHttpClient

class ARNavigationActivity : AppCompatActivity() {
    private lateinit var arSession: Session
    private lateinit var apiService: CampusApiService
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_ar_navigation)
        
        // Setup API service
        val client = OkHttpClient.Builder()
            .connectTimeout(30, TimeUnit.SECONDS)
            .readTimeout(30, TimeUnit.SECONDS)
            .build()
        
        val retrofit = Retrofit.Builder()
            .baseUrl("http://your-server-url/api/")
            .client(client)
            .addConverterFactory(GsonConverterFactory.create())
            .build()
        
        apiService = retrofit.create(CampusApiService::class.java)
        
        // Setup AR session
        setupArSession()
    }
    
    private fun setupArSession() {
        try {
            if (ArCoreApk.getInstance().requestInstall(this, true) === ArCoreApk.InstallStatus.INSTALLED) {
                arSession = Session(this)
                
                val config = Config(arSession)
                config.updateMode = Config.UpdateMode.LATEST_CAMERA_IMAGE
                config.focusMode = Config.FocusMode.AUTO
                
                // Configure geospatial features
                if (Session.isGeospatialModeSupported(Session.GeoSpatialMode.ENABLED)) {
                    config.geospatialMode = Config.GeoSpatialMode.ENABLED
                }
                
                arSession.configure(config)
                
                // Initialize AR renderer
                // ...
            }
        } catch (e: UnavailableArcoreNotInstalledException) {
            // ARCore not installed
            Toast.makeText(this, "ARCore not installed", Toast.LENGTH_LONG).show()
        } catch (e: Exception) {
            // Other errors
            Toast.makeText(this, "Error setting up AR: ${e.message}", Toast.LENGTH_LONG).show()
        }
    }
    
    private fun startNavigation(destination: LatLng) {
        // Get current location
        val earth = arSession.earth
        if (earth?.trackingState == TrackingState.TRACKING) {
            val currentLocation = earth.cameraGeospatialPose
            
            // Request navigation path
            val navigationRequest = NavigationRequest(
                start = LatLng(
                    currentLocation.latitude, 
                    currentLocation.longitude
                ),
                destination = destination
            )
            
            // Call API for navigation
            apiService.navigate(navigationRequest).enqueue(object : Callback<NavigationResponse> {
                override fun onResponse(call: Call<NavigationResponse>, response: Response<NavigationResponse>) {
                    if (response.isSuccessful) {
                        val path = response.body()
                        displayNavigationPath(path)
                    } else {
                        Toast.makeText(this@ARNavigationActivity, "Navigation failed", Toast.LENGTH_SHORT).show()
                    }
                }
                
                override fun onFailure(call: Call<NavigationResponse>, t: Throwable) {
                    Toast.makeText(this@ARNavigationActivity, "Network error: ${t.message}", Toast.LENGTH_SHORT).show()
                }
            })
        } else {
            Toast.makeText(this, "Earth tracking not available", Toast.LENGTH_SHORT).show()
        }
    }
    
    private fun displayNavigationPath(path: NavigationResponse?) {
        // Display AR navigation elements based on the returned path
        // This would involve creating anchors, rendering directional arrows, etc.
        // ...
    }
    
    private fun recognizeLandmark() {
        // Capture current camera frame
        val frame = arSession.update()
        val image = frame.acquireCameraImage()
        
        // Convert to byte array and send to server
        // ...
        
        // Call API for landmark recognition
        apiService.recognizeLandmark(imageBytes).enqueue(object : Callback<RecognitionResponse> {
            override fun onResponse(call: Call<RecognitionResponse>, response: Response<RecognitionResponse>) {
                if (response.isSuccessful && response.body()?.recognized == true) {
                    displayBuildingInfo(response.body()?.building)
                }
            }
            
            override fun onFailure(call: Call<RecognitionResponse>, t: Throwable) {
                Toast.makeText(this@ARNavigationActivity, "Recognition failed: ${t.message}", Toast.LENGTH_SHORT).show()
            }
        })
    }
    
    private fun displayBuildingInfo(building: Building?) {
        // Display building information overlay
        // ...
    }
    
    override fun onResume() {
        super.onResume()
        arSession.resume()
    }
    
    override fun onPause() {
        super.onPause()
        arSession.pause()
    }
}
"""

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

Resources:

6. Sentiment-Based Course Feedback Analysis System

A futuristic digital interface displaying a Sentiment-Based Course Feedback Analysis System. The screen shows student feedback categorized into positive, neutral, and negative sentiments with AI-driven emotion analysis. A professor reviews the insights on a holographic dashboard with visual graphs, emoji-based sentiment trends, and keyword highlights. The background features a high-tech classroom with students using tablets and smart devices.

Difficulty Level: Intermediate

Skills Developed: Natural Language Processing, Sentiment Analysis, Data Visualization, Web Development

Project Description: Develop a system that analyzes student feedback for courses using sentiment analysis and natural language processing. This tool helps educational institutions understand student satisfaction, identify areas for improvement, and track changes over time.

Key Features:

  • Text preprocessing and cleaning
  • Sentiment analysis of feedback comments
  • Topic modeling to identify common themes
  • Trend analysis across semesters
  • Interactive visualization dashboard
  • Recommendation engine for course improvements

Implementation Steps:

  1. Collect and preprocess sample course feedback data
  2. Implement sentiment analysis models using NLTK or spaCy
  3. Develop topic modeling using Latent Dirichlet Allocation
  4. Create data visualization components
  5. Build a web interface using Django or Flask
  6. Implement the recommendation engine

Sample Code Implementation:

pythonCopy# feedback_analysis.py
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.sentiment import SentimentIntensityAnalyzer
import gensim
from gensim import corpora
from gensim.models import LdaModel
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import NMF
import matplotlib.pyplot as plt
import seaborn as sns
from flask import Flask, render_template, request, jsonify
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
import os
from datetime import datetime
from wordcloud import WordCloud

# Download necessary NLTK resources
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('vader_lexicon')

app = Flask(__name__)

class FeedbackAnalyzer:
    def __init__(self):
        self.stop_words = set(stopwords.words('english'))
        self.lemmatizer = WordNetLemmatizer()
        self.sia = SentimentIntensityAnalyzer()
        
        # Add domain-specific stop words
        self.stop_words.update(['course', 'class', 'professor', 'instructor', 'student', 'lecture'])
    
    def load_data(self, file_path):
        """Load feedback data from CSV file."""
        if file_path.endswith('.csv'):
            return pd.read_csv(file_path)
        elif file_path.endswith('.xlsx'):
            return pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Please use CSV or Excel files.")
    
    def preprocess_text(self, text):
        """Clean and preprocess text data."""
        if not isinstance(text, str):
            return ""
        
        # Convert to lowercase
        text = text.lower()
        
        # Remove special characters and numbers
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        
        # Tokenize
        tokens = word_tokenize(text)
        
        # Remove stopwords and lemmatize
        processed_tokens = [self.lemmatizer.lemmatize(token) for token in tokens 
                           if token not in self.stop_words and len(token) > 2]
        
        return ' '.join(processed_tokens)
    
    def analyze_sentiment(self, text):
        """Analyze sentiment of text using VADER."""
        if not isinstance(text, str) or not text.strip():
            return {'compound': 0, 'pos': 0, 'neu': 0, 'neg': 0}
        
        return self.sia.polarity_scores(text)
    
    def classify_sentiment(self, compound_score):
        """Classify sentiment based on compound score."""
        if compound_score >= 0.05:
            return 'positive'
        elif compound_score <= -0.05:
            return 'negative'
        else:
            return 'neutral'
    
    def extract_topics(self, texts, num_topics=5, method='lda'):
        """Extract topics from a collection of texts."""
        if not texts or all(not isinstance(text, str) or not text.strip() for text in texts):
            return [], []
        
        # Filter out empty texts
        texts = [text for text in texts if isinstance(text, str) and text.strip()]
        
        if method == 'lda':
            # Tokenize texts
            tokenized_texts = [text.split() for text in texts]
            
            # Create dictionary and corpus
            dictionary = corpora.Dictionary(tokenized_texts)
            corpus = [dictionary.doc2bow(text) for text in tokenized_texts]
            
            # Train LDA model
            lda_model = LdaModel(
                corpus=corpus,
                id2word=dictionary,
                num_topics=num_topics,
                passes=15,
                alpha='auto',
                random_state=42
            )
            
            # Extract topics
            topics = lda_model.print_topics(num_words=10)
            
            # Map documents to topics
            doc_topics = []
            for doc in corpus:
                topic_probs = lda_model.get_document_topics(doc)
                doc_topics.append(max(topic_probs, key=lambda x: x[1])[0] if topic_probs else -1)
            
            return topics, doc_topics
        
        elif method == 'nmf':
            # Use Non-negative Matrix Factorization for topic modeling
            vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
            tfidf = vectorizer.fit_transform(texts)
            
            # Train NMF model
            nmf_model = NMF(n_components=num_topics, random_state=42)
            nmf_model.fit(tfidf)
            
            # Extract topics
            feature_names = vectorizer.get_feature_names_out()
            topics = []
            for topic_idx, topic in enumerate(nmf_model.components_):
                top_features_idx = topic.argsort()[:-11:-1]
                top_features = [feature_names[i] for i in top_features_idx]
                topic_str = ' '.join(top_features)
                topics.append((topic_idx, topic_str))
            
            # Map documents to topics
            doc_topic_matrix = nmf_model.transform(tfidf)
            doc_topics = doc_topic_matrix.argmax(axis=1).tolist()
            
            return topics, doc_topics
        
        else:
            raise ValueError("Unsupported topic extraction method. Use 'lda' or 'nmf'.")
    
    def analyze_feedback(self, data, comment_column, course_column=None, instructor_column=None, 
                        semester_column=None, rating_column=None, student_id_column=None):
        """
        Analyze feedback data and return results.
        
        Parameters:
        - data: DataFrame with feedback data
        - comment_column: Column name containing feedback text
        - course_column: Column name for course identification
        - instructor_column: Column name for instructor identification
        - semester_column: Column name for semester/term identification
        - rating_column: Column name for numerical ratings
        - student_id_column: Column name for student identification
        
        Returns:
        - DataFrame with analysis results
        """
        # Copy data to avoid modifying original
        df = data.copy()
        
        # Ensure comment column exists
        if comment_column not in df.columns:
            raise ValueError(f"Comment column '{comment_column}' not found in data.")
        
        # Preprocess comments
        df['processed_comment'] = df[comment_column].apply(self.preprocess_text)
        
        # Analyze sentiment
        sentiment_scores = df[comment_column].apply(self.analyze_sentiment)
        df['sentiment_compound'] = sentiment_scores.apply(lambda x: x['compound'])
        df['sentiment_positive'] = sentiment_scores.apply(lambda x: x['pos'])
        df['sentiment_neutral'] = sentiment_scores.apply(lambda x: x['neu'])
        df['sentiment_negative'] = sentiment_scores.apply(lambda x: x['neg'])
        df['sentiment_category'] = df['sentiment_compound'].apply(self.classify_sentiment)
        
        # Extract topics if there are enough valid comments
        valid_comments = df['processed_comment'].dropna().tolist()
        if len(valid_comments) >= 5:  # Minimum number for meaningful topic extraction
            topics, doc_topics = self.extract_topics(valid_comments)
            
            # Map topics back to dataframe
            topic_map = {i: idx for i, idx in enumerate(df.index) if isinstance(df.loc[idx, 'processed_comment'], str) 
                         and df.loc[idx, 'processed_comment'].strip()}
            
            topic_assignments = {}
            for i, topic_idx in enumerate(doc_topics):
                if i in topic_map:
                    topic_assignments[topic_map[i]] = topic_idx
            
            df['topic'] = df.index.map(lambda x: topic_assignments.get(x, -1))
            
            # Create topic descriptions
            topic_descriptions = {}
            for topic_idx, topic_terms in topics:
                # Extract main terms from topic, removing numbers and formatting
                terms = re.findall(r'"([^"]*)"', topic_terms)
                if terms:
                    topic_descriptions[topic_idx] = ', '.join(terms[:5])  # Top 5 terms
                else:
                    topic_descriptions[topic_idx] = f"Topic {topic_idx+1}"
            
            df['topic_description'] = df['topic'].map(lambda x: topic_descriptions.get(x, "No Topic") if x != -1 else "No Topic")
        else:
            df['topic'] = -1
            df['topic_description'] = "Insufficient data for topic modeling"
        
        # Group analysis by course, instructor, or semester if specified
        results = {'overall': df}
        
        if course_column and course_column in df.columns:
            results['by_course'] = df.groupby(course_column).agg({
                'sentiment_compound': 'mean',
                'sentiment_positive': 'mean',
                'sentiment_neutral': 'mean',
                'sentiment_negative': 'mean',
                comment_column: 'count'
            }).reset_index()
        
        if instructor_column and instructor_column in df.columns:
            results['by_instructor'] = df.groupby(instructor_column).agg({
                'sentiment_compound': 'mean',
                'sentiment_positive': 'mean',
                'sentiment_neutral': 'mean',
                'sentiment_negative': 'mean',
                comment_column: 'count'
            }).reset_index()
        
        if semester_column and semester_column in df.columns:
            results['by_semester'] = df.groupby(semester_column).agg({
                'sentiment_compound': 'mean',
                'sentiment_positive': 'mean',
                'sentiment_neutral': 'mean',
                'sentiment_negative': 'mean',
                comment_column: 'count'
            }).reset_index()
        
        if rating_column and rating_column in df.columns:
            # Calculate correlation between ratings and sentiment
            results['rating_correlation'] = df[[rating_column, 'sentiment_compound']].corr().iloc[0, 1]
            
            # Group by rating
            results['by_rating'] = df.groupby(rating_column).agg({
                'sentiment_compound': 'mean',
                comment_column: 'count'
            }).reset_index()
        
        return results
    
    def generate_recommendations(self, analysis_results):
        """Generate recommendations based on analysis results."""
        recommendations = []
        
        # Get overall data
        df = analysis_results['overall']
        
        # Find most negative comments
        negative_comments = df[df['sentiment_category'] == 'negative'].sort_values('sentiment_compound')
        
        if not negative_comments.empty:
            # Analyze most common topics in negative feedback
            topic_counts = negative_comments['topic_description'].value_counts()
            
            if not topic_counts.empty:
                # Recommend addressing top negative topics
                for topic, count in topic_counts.head(3).items():
                    if topic != "No Topic":
                        recommendations.append(f"Address concerns related to '{topic}' which appears in {count} negative comments.")
        
        # Check course trends if available
        if 'by_course' in analysis_results:
            course_data = analysis_results['by_course']
            # Find courses with lowest sentiment
            low_courses = course_data.sort_values('sentiment_compound').head(3)
            
            for _, row in low_courses.iterrows():
                recommendations.append(f"Review course '{row.iloc[0]}' which has a low sentiment score of {row['sentiment_compound']:.2f}.")
        
        # Check instructor trends if available
        if 'by_instructor' in analysis_results:
            instructor_data = analysis_results['by_instructor']
            # Find instructors with lowest sentiment
            low_instructors = instructor_data.sort_values('sentiment_compound').head(3)
            
            for _, row in low_instructors.iterrows():
                recommendations.append(f"Provide support to instructor '{row.iloc[0]}' whose feedback has a sentiment score of {row['sentiment_compound']:.2f}.")
        
        # Check semester trends if available
        if 'by_semester' in analysis_results:
            semester_data = analysis_results['by_semester'].sort_values('sentiment_compound')
            
            # Check if sentiment is declining
            if len(semester_data) > 1 and semester_data.iloc[0]['sentiment_compound'] < semester_data.iloc[-1]['sentiment_compound']:
                recommendations.append("Sentiment appears to be declining over time. Consider a comprehensive review of curriculum and teaching methods.")
        
        # General recommendations if specific ones couldn't be generated
        if not recommendations:
            recommendations = [
                "Collect more detailed feedback to enable better analysis.",
                "Consider adding structured questions to complement open-ended feedback.",
                "Implement regular check-ins during the semester rather than end-of-term feedback only."
            ]
        
        return recommendations
    
    def generate_visualizations(self, analysis_results):
        """Generate visualization data for dashboard."""
        visualizations = {}
        
        # Overall sentiment distribution
        sentiment_counts = analysis_results['overall']['sentiment_category'].value_counts().reset_index()
        sentiment_counts.columns = ['category', 'count']
        
        visualizations['sentiment_distribution'] = {
            'data': sentiment_counts.to_dict('records'),
            'layout': {
                'title': 'Overall Sentiment Distribution',
                'xaxis': {'title': 'Sentiment Category'},
                'yaxis': {'title': 'Count'}
            }
        }
        
        # Topic distribution
        topic_counts = analysis_results['overall']['topic_description'].value_counts().reset_index()
        topic_counts.columns = ['topic', 'count']
        topic_counts = topic_counts[topic_counts['topic'] != 'No Topic']  # Filter out no topic
        
        if not topic_counts.empty:
            visualizations['topic_distribution'] = {
                'data': topic_counts.to_dict('records'),
                'layout': {
                    'title': 'Topic Distribution in Feedback',
                    'xaxis': {'title': 'Topic'},
                    'yaxis': {'title': 'Count'}
                }
            }
        
        # Sentiment by topic
        if 'topic_description' in analysis_results['overall'].columns:
            topic_sentiment = analysis_results['overall'].groupby('topic_description').agg({
                'sentiment_compound': 'mean',
                'sentiment_compound': 'count'
            }).reset_index()
            
            topic_sentiment.columns = ['topic', 'avg_sentiment', 'count']
            topic_sentiment = topic_sentiment[topic_sentiment['topic'] != 'No Topic']
            
            if not topic_sentiment.empty:
                visualizations['sentiment_by_topic'] = {
                    'data': topic_sentiment.to_dict('records'),
                    'layout': {
                        'title': 'Average Sentiment by Topic',
                        'xaxis': {'title': 'Topic'},
                        'yaxis': {'title': 'Average Sentiment Score'}
                    }
                }
        
        # Trend analysis if semester data available
        if 'by_semester' in analysis_results:
            semester_data = analysis_results['by_semester']
            
            visualizations['sentiment_trend'] = {
                'data': semester_data.to_dict('records'),
                'layout': {
                    'title': 'Sentiment Trend Over Time',
                    'xaxis': {'title': 'Semester'},
                    'yaxis': {'title': 'Average Sentiment Score'}
                }
            }
        
        # Course comparison if course data available
        if 'by_course' in analysis_results:
            course_data = analysis_results['by_course']
            
            visualizations['course_comparison'] = {
                'data': course_data.to_dict('records'),
                'layout': {
                    'title': 'Sentiment Comparison by Course',
                    'xaxis': {'title': 'Course'},
                    'yaxis': {'title': 'Average Sentiment Score'}
                }
            }
        
        return visualizations

# Flask web application routes
analyzer = FeedbackAnalyzer()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return jsonify({'error': 'No file part'})
    
    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No selected file'})
    
    # Save file temporarily
    filename = f"temp_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"
    file_path = os.path.join('uploads', filename)
    os.makedirs('uploads', exist_ok=True)
    file.save(file_path)
    
    # Get column mappings from form
    comment_column = request.form.get('comment_column')
    course_column = request.form.get('course_column')
    instructor_column = request.form.get('instructor_column')
    semester_column = request.form.get('semester_column')
    rating_column = request.form.get('rating_column')
    
    try:
        # Load data
        data = analyzer.load_data(file_path)
        
        # Preview data for column selection if not provided
        if not comment_column:
            columns = data.columns.tolist()
            return jsonify({
                'status': 'column_selection',
                'columns': columns,
                'filename': filename
            })
        
        # Analyze feedback
        analysis_results = analyzer.analyze_feedback(
            data, 
            comment_column, 
            course_column, 
            instructor_column, 
            semester_column, 
            rating_column
        )
        
        # Generate recommendations
        recommendations = analyzer.generate_recommendations(analysis_results)
        
        # Generate visualizations
        visualizations = analyzer.generate_visualizations(analysis_results)
        
        # Prepare sample comments
        positive_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'positive']
        negative_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'negative']
        
        sample_positive = positive_comments.sort_values('sentiment_compound', ascending=False).head(5)[comment_column].tolist()
        sample_negative = negative_comments.sort_values('sentiment_compound').head(5)[comment_column].tolist()
        
        # Prepare overall stats
        stats = {
            'total_comments': len(analysis_results['overall']),
            'positive_pct': (analysis_results['overall']['sentiment_category'] == 'positive').mean() * 100,
            'neutral_pct': (analysis_results['overall']['sentiment_category'] == 'neutral').mean() * 100,
            'negative_pct': (analysis_results['overall']['sentiment_category'] == 'negative').mean() * 100,
            'avg_sentiment': analysis_results['overall']['sentiment_compound'].mean()
        }
        
        # Clean up temporary file
        os.remove(file_path)
        
        return jsonify({
            'status': 'success',
            'stats': stats,
            'recommendations': recommendations,
            'visualizations': visualizations,
            'sample_positive': sample_positive,
            'sample_negative': sample_negative
        })
    
    except Exception as e:
        # Clean up temporary file
        if os.path.exists(file_path):
            os.remove(file_path)
        
        return jsonify({'error': str(e)})

@app.route('/analyze', methods=['POST'])
def analyze():
    filename = request.form.get('filename')
    comment_column = request.form.get('comment_column')
    course_column = request.form.get('course_column')
    instructor_column = request.form.get('instructor_column')
    semester_column = request.form.get('semester_column')
    rating_column = request.form.get('rating_column')
    
    file_path = os.path.join('uploads', filename)
    
    if not os.path.exists(file_path):
        return jsonify({'error': 'File not found. Please upload again.'})
    
    try:
        # Load data
        data = analyzer.load_data(file_path)
        
        # Analyze feedback
        analysis_results = analyzer.analyze_feedback(
            data, 
            comment_column, 
            course_column, 
            instructor_column, 
            semester_column, 
            rating_column
        )
        
        # Generate recommendations
        recommendations = analyzer.generate_recommendations(analysis_results)
        
        # Generate visualizations
        visualizations = analyzer.generate_visualizations(analysis_results)
        
        # Prepare sample comments
        positive_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'positive']
        negative_comments = analysis_results['overall'][analysis_results['overall']['sentiment_category'] == 'negative']
        
        sample_positive = positive_comments.sort_values('sentiment_compound', ascending=False).head(5)[comment_column].tolist()
        sample_negative = negative_comments.sort_values('sentiment_compound').head(5)[comment_column].tolist()
        
        # Prepare overall stats
        stats = {
            'total_comments': len(analysis_results['overall']),
            'positive_pct': (analysis_results['overall']['sentiment_category'] == 'positive').mean() * 100,
            'neutral_pct': (analysis_results['overall']['sentiment_category'] == 'neutral').mean() * 100,
            'negative_pct': (analysis_results['overall']['sentiment_category'] == 'negative').mean() * 100,
            'avg_sentiment': analysis_results['overall']['sentiment_compound'].mean()
        }
        
        # Clean up temporary file
        os.remove(file_path)
        
        return jsonify({
            'status': 'success',
            'stats': stats,
            'recommendations': recommendations,
            'visualizations': visualizations,
            'sample_positive': sample_positive,
            'sample_negative': sample_negative
        })
    
    except Exception as e:
        # Clean up temporary file
        if os.path.exists(file_path):
            os.remove(file_path)
        
        return jsonify({'error': str(e)})

if __name__ == '__main__':
    app.run(debug=True)

Resources:

7. Automated Plagiarism Detection System with Code Analysis

A futuristic digital interface showcasing an Automated Plagiarism Detection System with Code Analysis. The screen displays lines of programming code being scanned for similarities, with AI-powered detection highlighting copied segments. A professor or student reviews plagiarism reports on a holographic dashboard with percentage match scores, similarity heatmaps, and real-time analysis indicators. The background features a high-tech classroom with students coding on laptops and smart devices.

Difficulty Level: Intermediate to Advanced

Skills Developed: Text Analysis, Algorithm Design, Code Parsing, Machine Learning

Project Description: Create a comprehensive plagiarism detection system specifically designed for programming assignments. Unlike simple text-matching tools, this system understands code structure, can detect logic similarities even when variable names change, and provides detailed reports on potential plagiarism instances.

Key Features:

  • Code tokenization and normalization
  • Algorithm similarity detection
  • Natural language explanation comparison
  • Source code repository integration
  • Detailed similarity reports
  • Learning capability to improve detection over time

Implementation Steps:

  1. Research code similarity algorithms
  2. Implement code parsing and tokenization
  3. Develop similarity detection algorithms
  4. Create a database of code submissions
  5. Build a web interface for submission and reporting
  6. Implement machine learning for improved detection

Sample Code Implementation:

pythonCopy# plagiarism_detector.py
import os
import re
import ast
import difflib
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns
from flask import Flask, render_template, request, jsonify
import sqlite3
import hashlib
import datetime
import json
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
import requests
from github import Github
import tokenize
from io import BytesIO

app = Flask(__name__)

class CodeProcessor:
    """Process code for plagiarism detection."""
    
    def __init__(self):
        self.python_lexer = PythonLexer()
        self.html_formatter = HtmlFormatter()
    
    def normalize_python_code(self, code):
        """
        Normalize Python code by:
        1. Removing comments
        2. Standardizing variable names
        3. Removing whitespace
        4. Normalizing function names
        """
        try:
            # Parse the code into an AST
            tree = ast.parse(code)
            
            # Create a visitor to normalize variables and function names
            normalizer = ASTNormalizer()
            normalized_tree = normalizer.visit(tree)
            
            # Convert back to code
            normalized_code = ast.unparse(normalized_tree)
            
            # Remove comments and whitespace
            normalized_code = re.sub(r'#.*$', '', normalized_code, flags=re.MULTILINE)
            normalized_code = re.sub(r'\s+', ' ', normalized_code)
            
            return normalized_code
        except SyntaxError:
            # If code can't be parsed, do basic normalization
            return self._basic_code_normalization(code)
    
    def _basic_code_normalization(self, code):
        """Basic normalization for when AST parsing fails."""
        # Remove comments
        code = re.sub(r'#.*$', '', code, flags=re.MULTILINE)
        
        # Remove docstrings
        code = re.sub(r'""".*?"""', '', code, flags=re.DOTALL)
        code = re.sub(r"'''.*?'''", '', code, flags=re.DOTALL)
        
        # Normalize whitespace
        code = re.sub(r'\s+', ' ', code)
        
        # Try to normalize variable names
        # This is a simplified approach - AST parsing would be more robust
        words = set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code))
        replacement_map = {}
        
        var_counter = 0
        func_counter = 0
        
        for word in words:
            # Skip Python keywords and built-ins
            if word in {'if', 'else', 'for', 'while', 'def', 'class', 'return',
                       'import', 'from', 'as', 'try', 'except', 'finally',
                       'print', 'True', 'False', 'None', 'and', 'or', 'not',
                       'in', 'is', 'with', 'range', 'len', 'int', 'str', 'float',
                       'list', 'dict', 'set', 'tuple'}:
                continue
            
            # Check if it's likely a function name (followed by parentheses)
            if re.search(r'\b' + re.escape(word) + r'\s*\(', code):
                replacement = f'func_{func_counter}'
                func_counter += 1
            else:
                replacement = f'var_{var_counter}'
                var_counter += 1
            
            replacement_map[word] = replacement
        
        # Replace identifiers
        normalized_code = code
        for original, replacement in replacement_map.items():
            normalized_code = re.sub(r'\b' + re.escape(original) + r'\b', replacement, normalized_code)
        
        return normalized_code
    
    def tokenize_code(self, code):
        """Tokenize code into a list of meaningful tokens."""
        try:
            tokens = []
            # Create a BytesIO object from the string content
            code_bytes = BytesIO(code.encode('utf-8'))
            
            # Tokenize the code
            for tok in tokenize.tokenize(code_bytes.readline):
                # Skip comments, whitespace and encoding tokens
                if tok.type not in {tokenize.COMMENT, tokenize.NL, tokenize.NEWLINE, tokenize.INDENT, 
                                    tokenize.DEDENT, tokenize.ENCODING}:
                    tokens.append((tokenize.tok_name[tok.type], tok.string))
            
            return tokens
        except tokenize.TokenError:
            # Fall back to simple splitting if tokenize fails
            return [('WORD', word) for word in re.findall(r'\b\w+\b', code)]
    
    def extract_code_structure(self, code):
        """Extract the structural elements of the code."""
        try:
            tree = ast.parse(code)
            
            # Extract information about classes, functions, and control structures
            structure_visitor = StructureVisitor()
            structure_visitor.visit(tree)
            
            return structure_visitor.structure
        except SyntaxError:
            # If AST parsing fails, fall back to regex-based extraction
            structure = {
                'classes': len(re.findall(r'\bclass\s+\w+', code)),
                'functions': len(re.findall(r'\bdef\s+\w+', code)),
                'if_statements': len(re.findall(r'\bif\s+', code)),
                'for_loops': len(re.findall(r'\bfor\s+', code)),
                'while_loops': len(re.findall(r'\bwhile\s+', code))
            }
            return structure
    
    def highlight_code(self, code):
        """Highlight code for HTML display."""
        return highlight(code, self.python_lexer, self.html_formatter)
    
    def generate_code_fingerprint(self, code):
        """Generate a fingerprint for code that captures its essence."""
        # Normalize code
        normalized_code = self.normalize_python_code(code)
        
        # Extract structure
        structure = self.extract_code_structure(code)
        
        # Create a fingerprint dictionary
        fingerprint = {
            'code_hash': hashlib.md5(normalized_code.encode()).hexdigest(),
            'structure': structure,
            'token_count': len(self.tokenize_code(code)),
            'normalized_code': normalized_code
        }
        
        return fingerprint

class ASTNormalizer(ast.NodeTransformer):
    """AST visitor to normalize variable and function names."""
    
    def __init__(self):
        self.var_map = {}
        self.func_map = {}
        self.class_map = {}
        self.var_counter = 0
        self.func_counter = 0
        self.class_counter = 0
    
    def visit_Name(self, node):
        if isinstance(node.ctx, ast.Store):
            # This is a variable definition
            if node.id not in self.var_map:
                self.var_map[node.id] = f'var_{self.var_counter}'
                self.var_counter += 1
            node.id = self.var_map[node.id]
        elif isinstance(node.ctx, ast.Load):
            # This is a variable usage
            if node.id in self.var_map:
                node.id = self.var_map[node.id]
            # Don't normalize built-ins and imports
        return node
    
    def visit_FunctionDef(self, node):
        # Normalize function name
        if node.name not in self.func_map:
            self.func_map[node.name] = f'func_{self.func_counter}'
            self.func_counter += 1
        node.name = self.func_map[node.name]
        
        # Process function body
        for field, old_value in ast.iter_fields(node):
            if isinstance(old_value, list):
                new_values = []
                for value in old_value:
                    if isinstance(value, ast.AST):
                        value = self.visit(value)
                        if value is None:
                            continue
                        elif not isinstance(value, ast.AST):
                            new_values.extend(value)
                            continue
                    new_values.append(value)
                old_value[:] = new_values
            elif isinstance(old_value, ast.AST):
                new_node = self.visit(old_value)
                if new_node is None:
                    delattr(node, field)
                else:
                    setattr(node, field, new_node)
        return node
    
    def visit_ClassDef(self, node):
        # Normalize class name
        if node.name not in self.class_map:
            self.class_map[node.name] = f'class_{self.class_counter}'
            self.class_counter += 1
        node.name = self.class_map[node.name]
        
        # Process class body
        for field, old_value in ast.iter_fields(node):
            if isinstance(old_value, list):
                new_values = []
                for value in old_value:
                    if isinstance(value, ast.AST):
                        value = self.visit(value)
                        if value is None:
                            continue
                        elif not isinstance(value, ast.AST):
                            new_values.extend(value)
                            continue
                    new_values.append(value)
                old_value[:] = new_values
            elif isinstance(old_value, ast.AST):
                new_node = self.visit(old_value)
                if new_node is None:
                    delattr(node, field)
                else:
                    setattr(node, field, new_node)
        return node

class StructureVisitor(ast.NodeVisitor):
    """AST visitor to extract code structure information."""
    
    def __init__(self):
        self.structure = {
            'classes': 0,
            'functions': 0,
            'methods': 0,
            'if_statements': 0,
            'for_loops': 0,
            'while_loops': 0,
            'try_blocks': 0,
            'imports': 0,
            'assignments': 0,
            'returns': 0,
            'function_calls': 0,
            'list_comprehensions': 0,
            'max_nesting_depth': 0,
            'current_nesting_depth': 0
        }
    
    def visit_ClassDef(self, node):
        self.structure['classes'] += 1
        self.generic_visit(node)
    
    def visit_FunctionDef(self, node):
        # Check if this is a method inside a class
        parent_class = False
        for parent in ast.walk(ast.parse('')):
            if isinstance(parent, ast.ClassDef) and node in parent.body:
                parent_class = True
                break
        
        if parent_class:
            self.structure['methods'] += 1
        else:
            self.structure['functions'] += 1
        
        # Visit function body with increased nesting depth
        self.structure['current_nesting_depth'] += 1
        self.structure['max_nesting_depth'] = max(
            self.structure['max_nesting_depth'], 
            self.structure['current_nesting_depth']
        )
        
        self.generic_visit(node)
        
        # Restore nesting depth
        self.structure['current_nesting_depth'] -= 1
    
    def visit_If(self, node):
        self.structure['if_statements'] += 1
        
        # Increase nesting for the body
        self.structure['current_nesting_depth'] += 1
        self.structure['max_nesting_depth'] = max(
            self.structure['max_nesting_depth'], 
            self.structure['current_nesting_depth']
        )
        
        self.generic_visit(node)
        
        # Restore nesting depth
        self.structure['current_nesting_depth'] -= 1
    
    def visit_For(self, node):
        self.structure['for_loops'] += 1
        
        # Increase nesting for the loop body
        self.structure['current_nesting_depth'] += 1
        self.structure['max_nesting_depth'] = max(
            self.structure['max_nesting_depth'], 
            self.structure['current_nesting_depth']
        )
        
        self.generic_visit(node)
        
        # Restore nesting depth
        self.structure['current_nesting_depth'] -= 1
    
    def visit_While(self, node):
        self.structure['while_loops'] += 1
        
        # Increase nesting for the loop body
        self.structure['current_nesting_depth'] += 1
        self.structure['max_nesting_depth'] = max(
            self.structure['max_nesting_depth'], 
            self.structure['current_nesting_depth']
        )
        
        self.generic_visit(node)
        
        # Restore nesting depth
        self.structure['current_nesting_depth'] -= 1
    
    def visit_Try(self, node):
        self.structure['try_blocks'] += 1
        self.generic_visit(node)
    
    def visit_Import(self, node):
        self.structure['imports'] += len(node.names)
        self.generic_visit(node)
    
    def visit_ImportFrom(self, node):
        self.structure['imports'] += len(node.names)
        self.generic_visit(node)
    
    def visit_Assign(self, node):
        self.structure['assignments'] += 1
        self.generic_visit(node)
    
    def visit_Return(self, node):
        self.structure['returns'] += 1
        self.generic_visit(node)
    
    def visit_Call(self, node):
        self.structure['function_calls'] += 1
        self.generic_visit(node)
    
    def visit_ListComp(self, node):
        self.structure['list_comprehensions'] += 1
        self.generic_visit(node)

class PlagiarismDetector:
    """Detect plagiarism in code submissions."""
    
    def __init__(self, db_path='plagiarism.db'):
        self.db_path = db_path
        self.code_processor = CodeProcessor()
        self.init_db()
    
    def init_db(self):
        """Initialize the database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Create submissions table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS submissions (
            id INTEGER PRIMARY KEY,
            assignment_id TEXT,
            student_id TEXT,
            submission_time TIMESTAMP,
            code_hash TEXT,
            code_text TEXT,
            normalized_code TEXT,
            fingerprint TEXT
        )
        ''')
        
        # Create comparison results table
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS comparisons (
            id INTEGER PRIMARY KEY,
            submission1_id INTEGER,
            submission2_id INTEGER,
            similarity_score REAL,
            comparison_type TEXT,
            comparison_time TIMESTAMP,
            details TEXT,
            FOREIGN KEY (submission1_id) REFERENCES submissions (id),
            FOREIGN KEY (submission2_id) REFERENCES submissions (id)
        )
        ''')
        
        conn.commit()
        conn.close()
    
    def add_submission(self, assignment_id, student_id, code_text):
        """Add a new code submission to the database."""
        # Process the code
        fingerprint = self.code_processor.generate_code_fingerprint(code_text)
        
        # Store in database
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
        INSERT INTO submissions 
        (assignment_id, student_id, submission_time, code_hash, code_text, normalized_code, fingerprint)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            assignment_id,
            student_id,
            datetime.datetime.now(),
            fingerprint['code_hash'],
            code_text,
            fingerprint['normalized_code'],
            json.dumps(fingerprint)
        ))
        
        submission_id = cursor.lastrowid
        
        conn.commit()
        conn.close()
        
        return submission_id
    
    def compare_submissions(self, submission1_id, submission2_id):
        """Compare two submissions and calculate similarity scores."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Get submission data
        cursor.execute('''
        SELECT code_text, normalized_code, fingerprint
        FROM submissions
        WHERE id = ?
        ''', (submission1_id,))
        
        sub1_data = cursor.fetchone()
        
        cursor.execute('''
        SELECT code_text, normalized_code, fingerprint
        FROM submissions
        WHERE id = ?
        ''', (submission2_id,))
        
        sub2_data = cursor.fetchone()
        
        if not sub1_data or not sub2_data:
            conn.close()
            return None
        
        code1, norm_code1, fingerprint1 = sub1_data
        code2, norm_code2, fingerprint2 = sub2_data
        
        fingerprint1 = json.loads(fingerprint1)
        fingerprint2 = json.loads(fingerprint2)
        
        # Calculate similarity scores using different methods
        
        # 1. Normalized code similarity (TF-IDF + Cosine)
        vectorizer = TfidfVectorizer()
        try:
            tfidf_matrix = vectorizer.fit_transform([norm_code1, norm_code2])
            code_similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
        except:
            code_similarity = 0
        
        # 2. Structure similarity
        structure1 = fingerprint1['structure']
        structure2 = fingerprint2['structure']
        
        # Convert structures to vectors
        structure_vec1 = np.array([
            structure1.get('classes', 0),
            structure1.get('functions', 0),
            structure1.get('methods', 0),
            structure1.get('if_statements', 0),
            structure1.get('for_loops', 0),
            structure1.get('while_loops', 0),
            structure1.get('try_blocks', 0),
            structure1.get('imports', 0),
            structure1.get('assignments', 0),
            structure1.get('returns', 0),
            structure1.get('function_calls', 0),
            structure1.get('list_comprehensions', 0),
            structure1.get('max_nesting_depth', 0)
        ])
        
        structure_vec2 = np.array([
            structure2.get('classes', 0),
            structure2.get('functions', 0),
            structure2.get('methods', 0),
            structure2.get('if_statements', 0),
            structure2.get('for_loops', 0),
            structure2.get('while_loops', 0),
            structure2.get('try_blocks', 0),
            structure2.get('imports', 0),
            structure2.get('assignments', 0),
            structure2.get('returns', 0),
            structure2.get('function_calls', 0),
            structure2.get('list_comprehensions', 0),
            structure2.get('max_nesting_depth', 0)
        ])
        
        # Normalize vectors
        norm1 = np.linalg.norm(structure_vec1)
        norm2 = np.linalg.norm(structure_vec2)
        
        if norm1 > 0 and norm2 > 0:
            structure_vec1 = structure_vec1 / norm1
            structure_vec2 = structure_vec2 / norm2
            structure_similarity = np.dot(structure_vec1, structure_vec2)
        else:
            structure_similarity = 0
        
        # 3. Sequence similarity using diff
        sequence_similarity = difflib.SequenceMatcher(None, norm_code1, norm_code2).ratio()
        
        # Calculate overall similarity score (weighted average)
        overall_similarity = (0.5 * code_similarity + 
                             0.3 * structure_similarity + 
                             0.2 * sequence_similarity)
        
        # Store comparison results
        details = {
            'code_similarity': code_similarity,
            'structure_similarity': structure_similarity,
            'sequence_similarity': sequence_similarity,
            'diff_highlights': [],  # Placeholder for diff highlights
            'structure_comparison': {
                'structure1': structure1,
                'structure2': structure2
            }
        }
        
        # Generate diff highlights for user-friendly display
        diff = difflib.ndiff(norm_code1.splitlines(), norm_code2.splitlines())
        diff_highlights = list(diff)
        details['diff_highlights'] = diff_highlights
        
        cursor.execute('''
        INSERT INTO comparisons
        (submission1_id, submission2_id, similarity_score, comparison_type, comparison_time, details)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            submission1_id,
            submission2_id,
            overall_similarity,
            'code',
            datetime.datetime.now(),
            json.dumps(details)
        ))
        
        comparison_id = cursor.lastrowid
        
        conn.commit()
        conn.close()
        
        return {
            'id': comparison_id,
            'overall_similarity': overall_similarity,
            'code_similarity': code_similarity,
            'structure_similarity': structure_similarity,
            'sequence_similarity': sequence_similarity,
            'details': details
        }
    
    def check_plagiarism(self, assignment_id, submission_id):
        """Check a submission against all other submissions for the same assignment."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Get all other submissions for the same assignment
        cursor.execute('''
        SELECT id, student_id
        FROM submissions
        WHERE assignment_id = ? AND id != ?
        ''', (assignment_id, submission_id))
        
        other_submissions = cursor.fetchall()
        conn.close()
        
        results = []
        
        for other_id, other_student in other_submissions:
            comparison = self.compare_submissions(submission_id, other_id)
            
            if comparison:
                results.append({
                    'other_submission_id': other_id,
                    'other_student_id': other_student,
                    'similarity': comparison['overall_similarity'],
                    'comparison_id': comparison['id']
                })
        
        # Sort by similarity, highest first
        results.sort(key=lambda x: x['similarity'], reverse=True)
        
        return results
    
    def get_comparison_details(self, comparison_id):
        """Get detailed information about a comparison."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
        SELECT c.similarity_score, c.details, 
               s1.student_id as student1_id, s1.code_text as code1, 
               s2.student_id as student2_id, s2.code_text as code2
        FROM comparisons c
        JOIN submissions s1 ON c.submission1_id = s1.id
        JOIN submissions s2 ON c.submission2_id = s2.id
        WHERE c.id = ?
        ''', (comparison_id,))
        
        result = cursor.fetchone()
        conn.close()
        
        if not result:
            return None
        
        similarity, details_json, student1, code1, student2, code2 = result
        details = json.loads(details_json)
        
        # Highlight code for display
        highlighted_code1 = self.code_processor.highlight_code(code1)
        highlighted_code2 = self.code_processor.highlight_code(code2)
        
        return {
            'similarity': similarity,
            'student1': student1,
            'student2': student2,
            'code1': code1,
            'code2': code2,
            'highlighted_code1': highlighted_code1,
            'highlighted_code2': highlighted_code2,
            'details': details
        }

# Flask application routes
detector = PlagiarismDetector()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/submit', methods=['POST'])
def submit_code():
    try:
        data = request.json
        assignment_id = data.get('assignment_id')
        student_id = data.get('student_id')
        code = data.get('code')
        
        if not assignment_id or not student_id or not code:
            return jsonify({'error': 'Missing required fields'}), 400
        
        # Add submission to database
        submission_id = detector.add_submission(assignment_id, student_id, code)
        
        # Check for plagiarism
        results = detector.check_plagiarism(assignment_id, submission_id)
        
        # Categorize results
        high_similarity = [r for r in results if r['similarity'] > 0.8]
        medium_similarity = [r for r in results if 0.6 <= r['similarity'] <= 0.8]
        low_similarity = [r for r in results if 0.4 <= r['similarity'] < 0.6]
        
        return jsonify({
            'submission_id': submission_id,
            'plagiarism_check': {
                'high_similarity': high_similarity,
                'medium_similarity': medium_similarity,
                'low_similarity': low_similarity,
                'total_comparisons': len(results)
            }
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/compare/<int:comparison_id>')
def get_comparison(comparison_id):
    try:
        details = detector.get_comparison_details(comparison_id)
        
        if not details:
            return jsonify({'error': 'Comparison not found'}), 404
        
        return jsonify(details)
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/github_import', methods=['POST'])
def github_import():
    try:
        data = request.json
        repo_url = data.get('repo_url')
        assignment_id = data.get('assignment_id')
        access_token = data.get('access_token')  # Optional
        
        if not repo_url or not assignment_id:
            return jsonify({'error': 'Missing required fields'}), 400
        
        # Extract owner and repo name from URL
        match = re.search(r'github\.com/([^/]+)/([^/]+)', repo_url)
        if not match:
            return jsonify({'error': 'Invalid GitHub repository URL'}), 400
        
        owner, repo_name = match.groups()
        
        # Initialize GitHub client
        g = Github(access_token) if access_token else Github()
        
        # Get repository
        repo = g.get_repo(f"{owner}/{repo_name}")
        
        # Get all Python files
        contents = repo.get_contents("")
        python_files = []
        
        while contents:
            file_content = contents.pop(0)
            if file_content.type == "dir":
                contents.extend(repo.get_contents(file_content.path))
            elif file_content.path.endswith(".py"):
                python_files.append({
                    'path': file_content.path,
                    'content': file_content.decoded_content.decode('utf-8')
                })
        
        # Process each file as a submission
        submission_ids = []
        for file in python_files:
            # Use file path as student ID for demonstration
            student_id = file['path']
            code = file['content']
            
            submission_id = detector.add_submission(assignment_id, student_id, code)
            submission_ids.append({
                'submission_id': submission_id,
                'file_path': file['path']
            })
        
        # Run cross-comparisons
        comparison_results = []
        for i, sub1 in enumerate(submission_ids):
            for sub2 in submission_ids[i+1:]:
                comparison = detector.compare_submissions(sub1['submission_id'], sub2['submission_id'])
                if comparison:
                    comparison_results.append({
                        'file1': sub1['file_path'],
                        'file2': sub2['file_path'],
                        'similarity': comparison['overall_similarity'],
                        'comparison_id': comparison['id']
                    })
        
        # Sort by similarity
        comparison_results.sort(key=lambda x: x['similarity'], reverse=True)
        
        return jsonify({
            'submission_count': len(submission_ids),
            'submissions': submission_ids,
            'comparisons': comparison_results
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

Resources:

8. Virtual Collaborative Research Laboratory

A futuristic Virtual Collaborative Research Laboratory where scientists and students interact in a digital workspace. The scene includes holographic research data, AI-driven analytics, and a shared virtual whiteboard with project ideas. Researchers wear AR/VR headsets while collaborating remotely. The background features a high-tech lab environment with floating data screens, robotic assistants, and interconnected research hubs.

Difficulty Level: Advanced

Skills Developed: Real-time Communication, Collaboration Tools, Version Control, Data Visualization

Project Description: Develop a virtual collaborative environment where students can work together on research projects in real-time. This platform includes shared code editing, data visualization, video conferencing, and integrated research tools, making remote collaboration seamless for academic projects.

Key Features:

  • Real-time collaborative code editing
  • Integrated version control
  • Interactive data visualization workspace
  • Video conferencing with screen sharing
  • Project management tools
  • Research paper citation and organization

Implementation Steps:

  1. Set up the backend infrastructure with Django or Flask
  2. Implement WebSockets for real-time collaboration
  3. Integrate a code editor component
  4. Develop the data visualization workspace
  5. Add video conferencing capability
  6. Create the project management interface

Sample Code Implementation:

pythonCopy# collaborative_lab.py
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import os
import json
import uuid
import datetime
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg') # Use Agg backend for headless environments
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from flask_cors import CORS
import logging
import re

# Configuration
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///collaborative_lab.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max upload
app.config['ALLOWED_EXTENSIONS'] = {'csv', 'xlsx', 'txt', 'json', 'py', 'ipynb', 'pdf', 'docx'}

# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)

# Initialize extensions
db = SQLAlchemy(app)
migrate = Migrate(app, db)
socketio = SocketIO(app, cors_allowed_origins="*")
CORS(app)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Database Models
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128))
full_name = db.Column(db.String(100))
institution = db.Column(db.String(100))
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
projects = db.relationship('Project', secondary='project_members', backref='members')

def set_password(self, password):
self.password_hash = generate_password_hash(password)

def check_password(self, password):
return check_password_hash(self.password_hash, password)

class Project(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
owner = db.relationship('User', foreign_keys=[owner_id])
files = db.relationship('File', backref='project', cascade='all, delete-orphan')
tasks = db.relationship('Task', backref='project', cascade='all, delete-orphan')
citations = db.relationship('Citation', backref='project', cascade='all, delete-orphan')

# Association table for many-to-many relationship between users and projects
project_members = db.Table('project_members',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('project_id', db.Integer, db.ForeignKey('project.id'), primary_key=True)
)

class File(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(100), nullable=False)
file_path = db.Column(db.String(255), nullable=False)
file_type = db.Column(db.String(20))
description = db.Column(db.Text)
uploaded_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
version = db.Column(db.Integer, default=1)

uploader = db.relationship('User', foreign_keys=[uploaded_by])

class FileVersion(db.Model):
id = db.Column(db.Integer, primary_key=True)
file_id = db.Column(db.Integer, db.ForeignKey('file.id'))
version = db.Column(db.Integer, nullable=False)
file_path = db.Column(db.String(255), nullable=False)
committed_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
committed_by = db.Column(db.Integer, db.ForeignKey('user.id'))
commit_message = db.Column(db.Text)

file = db.relationship('File', foreign_keys=[file_id])
committer = db.relationship('User', foreign_keys=[committed_by])

class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
status = db.Column(db.String(20), default='todo') # todo, in_progress, review, done
priority = db.Column(db.String(20), default='medium') # low, medium, high
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
due_date = db.Column(db.DateTime)
assigned_to = db.Column(db.Integer, db.ForeignKey('user.id'))
created_by = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))

assignee = db.relationship('User', foreign_keys=[assigned_to])
creator = db.relationship('User', foreign_keys=[created_by])

class Citation(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(255), nullable=False)
authors = db.Column(db.Text)
journal = db.Column(db.String(255))
year = db.Column(db.Integer)
doi = db.Column(db.String(100))
url = db.Column(db.String(255))
abstract = db.Column(db.Text)
notes = db.Column(db.Text)
added_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
added_by = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))

adder = db.relationship('User', foreign_keys=[added_by])

class ChatMessage(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))

user = db.relationship('User', foreign_keys=[user_id])
project = db.relationship('Project', foreign_keys=[project_id])

# Helper Functions
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']

def save_file(file, project_id, user_id, description=""):
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_type = filename.rsplit('.', 1)[1].lower()

# Create unique filename
unique_filename = f"{uuid.uuid4()}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)

file.save(file_path)

# Create file record
new_file = File(
filename=filename,
file_path=file_path,
file_type=file_type,
description=description,
uploaded_by=user_id,
project_id=project_id
)

db.session.add(new_file)
db.session.commit()

# Create initial version
new_version = FileVersion(
file_id=new_file.id,
version=1,
file_path=file_path,
committed_by=user_id,
commit_message="Initial upload"
)

db.session.add(new_version)
db.session.commit()

return new_file

return None

def update_file(file_id, new_file, user_id, commit_message=""):
file_record = File.query.get(file_id)

if not file_record:
return None

if new_file and allowed_file(new_file.filename):
# Create new version
new_version_num = file_record.version + 1

# Create unique filename for new version
filename = secure_filename(file_record.filename)
unique_filename = f"{uuid.uuid4()}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)

new_file.save(file_path)

# Update file record
file_record.version = new_version_num
file_record.file_path = file_path
file_record.updated_at = datetime.datetime.utcnow()

# Create version record
new_version = FileVersion(
file_id=file_id,
version=new_version_num,
file_path=file_path,
committed_by=user_id,
commit_message=commit_message
)

db.session.add(new_version)
db.session.commit()

return file_record

return None

def get_file_content(file_path, file_type):
"""Get content of a file, possibly with preprocessing for specific types."""
if not os.path.exists(file_path):
return None

if file_type in ['csv', 'txt']:
with open(file_path, 'r') as f:
return f.read()
elif file_type == 'json':
with open(file_path, 'r') as f:
return json.load(f)
elif file_type == 'py':
with open(file_path, 'r') as f:
return f.read()
else:
# For binary files, just indicate they exist but don't return content
return f"Binary file of type {file_type}"

def generate_visualization(file_path, file_type, vis_type, options):
"""Generate a visualization based on the given file and options."""
if file_type == 'csv':
try:
df = pd.read_csv(file_path)
return create_visualization(df, vis_type, options)
except Exception as e:
return {'error': str(e)}
elif file_type == 'xlsx':
try:
df = pd.read_excel(file_path)
return create_visualization(df, vis_type, options)
except Exception as e:
return {'error': str(e)}
else:
return {'error': 'Unsupported file type for visualization'}

def create_visualization(df, vis_type, options):
"""Create a visualization using matplotlib/seaborn."""
plt.figure(figsize=(10, 6))

try:
if vis_type == 'line':
x = options.get('x')
y = options.get('y')
if x and y:
sns.lineplot(data=df, x=x, y=y)
plt.title(options.get('title', f"{y} vs {x}"))
else:
return {'error': 'Missing x or y axis specification'}

elif vis_type == 'bar':
x = options.get('x')
y = options.get('y')
if x and y:
sns.barplot(data=df, x=x, y=y)
plt.title(options.get('title', f"{y} by {x}"))
else:
return {'error': 'Missing x or y axis specification'}

elif vis_type == 'scatter':
x = options.get('x')
y = options.get('y')
if x and y:
hue = options.get('hue')
if hue:
sns.scatterplot(data=df, x=x, y=y, hue=hue)
else:
sns.scatterplot(data=df, x=x, y=y)
plt.title(options.get('title', f"{y} vs {x}"))
else:
return {'error': 'Missing x or y axis specification'}

elif vis_type == 'histogram':
column = options.get('column')
if column:
sns.histplot(data=df, x=column, bins=options.get('bins', 10))
plt.title(options.get('title', f"Distribution of {column}"))
else:
return {'error': 'Missing column specification'}

elif vis_type == 'heatmap':
if not options.get('columns'):
# Use numeric columns for correlation heatmap
numeric_df = df.select_dtypes(include=[np.number])
if numeric_df.empty:
return {'error': 'No numeric columns for heatmap'}
corr = numeric_df.corr()
sns.heatmap(corr, annot=True, cmap='coolwarm')
plt.title(options.get('title', 'Correlation Heatmap'))
else:
# Use specified columns
columns = options.get('columns')
pivot_column = options.get('pivot_column')
value_column = options.get('value_column')

if pivot_column and value_column:
pivot_table = df.pivot(index=columns[0], columns=pivot_column, values=value_column)
sns.heatmap(pivot_table, annot=True, cmap='coolwarm')
plt.title(options.get('title', f'Heatmap of {value_column}'))
else:
return {'error': 'Missing pivot or value column specification'}

else:
return {'error': 'Unsupported visualization type'}

# Save figure to a bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)

# Convert to base64 for embedding in HTML
data = base64.b64encode(buf.read()).decode('utf-8')
plt.close()

return {
'image': f'data:image/png;base64,{data}',
'type': vis_type
}

except Exception as e:
plt.close()
return {'error': str(e)}

# API Routes
@app.route('/')
def home():
if 'user_id' in session:
return redirect(url_for('dashboard'))
return render_template('index.html')

@app.route('/api/register', methods=['POST'])
def register():
data = request.json

# Check if user already exists
existing_user = User.query.filter_by(email=data['email']).first()
if existing_user:
return jsonify({'error': 'Email already registered'}), 400

existing_username = User.query.filter_by(username=data['username']).first()
if existing_username:
return jsonify({'error': 'Username already taken'}), 400

# Create new user
new_user = User(
username=data['username'],
email=data['email'],
full_name=data.get('full_name', ''),
institution=data.get('institution', '')
)
new_user.set_password(data['password'])

db.session.add(new_user)
db.session.commit()

return jsonify({'message': 'User registered successfully', 'user_id': new_user.id})

@app.route('/api/login', methods=['POST'])
def login():
data = request.json

user = User.query.filter_by(username=data['username']).first()

if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid username or password'}), 401

session['user_id'] = user.id

return jsonify({
'message': 'Login successful',
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'full_name': user.full_name
}
})

@app.route('/api/logout', methods=['POST'])
def logout():
session.pop('user_id', None)
return jsonify({'message': 'Logout successful'})

@app.route('/api/projects', methods=['GET'])
def get_projects():
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

user = User.query.get(session['user_id'])

if not user:
return jsonify({'error': 'User not found'}), 404

# Get projects where user is either owner or member
owned_projects = Project.query.filter_by(owner_id=user.id).all()
member_projects = user.projects

all_projects = set(owned_projects + member_projects)

projects_data = []
for project in all_projects:
projects_data.append({
'id': project.id,
'name': project.name,
'description': project.description,
'created_at': project.created_at.isoformat(),
'updated_at': project.updated_at.isoformat(),
'owner': {
'id': project.owner.id,
'username': project.owner.username
} if project.owner else None,
'is_owner': project.owner_id == user.id,
'member_count': len(project.members)
})

return jsonify(projects_data)

@app.route('/api/projects', methods=['POST'])
def create_project():
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

data = request.json

new_project = Project(
name=data['name'],
description=data.get('description', ''),
owner_id=session['user_id']
)

# Add owner as a member
user = User.query.get(session['user_id'])
new_project.members.append(user)

db.session.add(new_project)
db.session.commit()

return jsonify({
'message': 'Project created successfully',
'project': {
'id': new_project.id,
'name': new_project.name,
'description': new_project.description
}
})

@app.route('/api/projects/<int:project_id>', methods=['GET'])
def get_project(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Check if user is a member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

members_data = []
for member in project.members:
members_data.append({
'id': member.id,
'username': member.username,
'full_name': member.full_name,
'is_owner': member.id == project.owner_id
})

files_data = []
for file in project.files:
files_data.append({
'id': file.id,
'filename': file.filename,
'file_type': file.file_type,
'description': file.description,
'uploaded_at': file.uploaded_at.isoformat(),
'updated_at': file.updated_at.isoformat(),
'version': file.version,
'uploader': {
'id': file.uploader.id,
'username': file.uploader.username
} if file.uploader else None
})

tasks_data = []
for task in project.tasks:
tasks_data.append({
'id': task.id,
'title': task.title,
'description': task.description,
'status': task.status,
'priority': task.priority,
'created_at': task.created_at.isoformat(),
'due_date': task.due_date.isoformat() if task.due_date else None,
'assignee': {
'id': task.assignee.id,
'username': task.assignee.username
} if task.assignee else None,
'creator': {
'id': task.creator.id,
'username': task.creator.username
} if task.creator else None
})

citations_data = []
for citation in project.citations:
citations_data.append({
'id': citation.id,
'title': citation.title,
'authors': citation.authors,
'journal': citation.journal,
'year': citation.year,
'doi': citation.doi,
'url': citation.url
})

project_data = {
'id': project.id,
'name': project.name,
'description': project.description,
'created_at': project.created_at.isoformat(),
'updated_at': project.updated_at.isoformat(),
'owner': {
'id': project.owner.id,
'username': project.owner.username,
'full_name': project.owner.full_name
} if project.owner else None,
'members': members_data,
'files': files_data,
'tasks': tasks_data,
'citations': citations_data,
'is_owner': project.owner_id == user.id
}

return jsonify(project_data)

@app.route('/api/projects/<int:project_id>/members', methods=['POST'])
def add_member(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Verify user is project owner
if project.owner_id != session['user_id']:
return jsonify({'error': 'Only the project owner can add members'}), 403

data = request.json
username = data.get('username')
email = data.get('email')

# Find user by username or email
if username:
new_member = User.query.filter_by(username=username).first()
elif email:
new_member = User.query.filter_by(email=email).first()
else:
return jsonify({'error': 'Username or email required'}), 400

if not new_member:
return jsonify({'error': 'User not found'}), 404

# Check if user is already a member
if new_member in project.members:
return jsonify({'error': 'User is already a project member'}), 400

# Add member
project.members.append(new_member)
db.session.commit()

return jsonify({
'message': 'Member added successfully',
'member': {
'id': new_member.id,
'username': new_member.username,
'full_name': new_member.full_name
}
})

@app.route('/api/projects/<int:project_id>/members/<int:user_id>', methods=['DELETE'])
def remove_member(project_id, user_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Only project owner can remove members
if project.owner_id != session['user_id']:
return jsonify({'error': 'Only the project owner can remove members'}), 403

# Owner cannot be removed
if user_id == project.owner_id:
return jsonify({'error': 'Owner cannot be removed from project'}), 400

member = User.query.get(user_id)

if not member:
return jsonify({'error': 'User not found'}), 404

if member not in project.members:
return jsonify({'error': 'User is not a project member'}), 400

# Remove member
project.members.remove(member)
db.session.commit()

return jsonify({'message': 'Member removed successfully'})

@app.route('/api/projects/<int:project_id>/files', methods=['POST'])
def upload_file_route(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Check if user is a member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400

file = request.files['file']

if file.filename == '':
return jsonify({'error': 'No selected file'}), 400

description = request.form.get('description', '')

new_file = save_file(file, project_id, user.id, description)

if not new_file:
return jsonify({'error': 'File upload failed'}), 400

return jsonify({
'message': 'File uploaded successfully',
'file': {
'id': new_file.id,
'filename': new_file.filename,
'file_type': new_file.file_type,
'description': new_file.description,
'uploaded_at': new_file.uploaded_at.isoformat(),
'version': new_file.version
}
})

@app.route('/api/files/<int:file_id>', methods=['GET'])
def get_file(file_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

file_record = File.query.get(file_id)

if not file_record:
return jsonify({'error': 'File not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(file_record.project_id)

if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

# Get file versions
versions = FileVersion.query.filter_by(file_id=file_id).order_by(FileVersion.version.desc()).all()

versions_data = []
for version in versions:
versions_data.append({
'version': version.version,
'committed_at': version.committed_at.isoformat(),
'committer': {
'id': version.committer.id,
'username': version.committer.username
} if version.committer else None,
'commit_message': version.commit_message
})

# Get file content
content = get_file_content(file_record.file_path, file_record.file_type)

file_data = {
'id': file_record.id,
'filename': file_record.filename,
'file_type': file_record.file_type,
'description': file_record.description,
'uploaded_at': file_record.uploaded_at.isoformat(),
'updated_at': file_record.updated_at.isoformat(),
'version': file_record.version,
'uploader': {
'id': file_record.uploader.id,
'username': file_record.uploader.username
} if file_record.uploader else None,
'project_id': file_record.project_id,
'versions': versions_data,
'content': content
}

return jsonify(file_data)

@app.route('/api/files/<int:file_id>', methods=['PUT'])
def update_file_route(file_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

file_record = File.query.get(file_id)

if not file_record:
return jsonify({'error': 'File not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(file_record.project_id)

if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400

file = request.files['file']

if file.filename == '':
return jsonify({'error': 'No selected file'}), 400

commit_message = request.form.get('commit_message', '')

updated_file = update_file(file_id, file, user.id, commit_message)

if not updated_file:
return jsonify({'error': 'File update failed'}), 400

return jsonify({
'message': 'File updated successfully',
'file': {
'id': updated_file.id,
'filename': updated_file.filename,
'file_type': updated_file.file_type,
'updated_at': updated_file.updated_at.isoformat(),
'version': updated_file.version
}
})

@app.route('/api/projects/<int:project_id>/tasks', methods=['POST'])
def create_task(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

data = request.json

# Parse due date if provided
due_date = None
if data.get('due_date'):
try:
due_date = datetime.datetime.fromisoformat(data['due_date'])
except ValueError:
return jsonify({'error': 'Invalid date format'}), 400

# Create task
new_task = Task(
title=data['title'],
description=data.get('description', ''),
status=data.get('status', 'todo'),
priority=data.get('priority', 'medium'),
due_date=due_date,
assigned_to=data.get('assigned_to'),
created_by=user.id,
project_id=project_id
)

db.session.add(new_task)
db.session.commit()

return jsonify({
'message': 'Task created successfully',
'task': {
'id': new_task.id,
'title': new_task.title,
'status': new_task.status,
'priority': new_task.priority
}
})

@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

task = Task.query.get(task_id)

if not task:
return jsonify({'error': 'Task not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(task.project_id)

if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

data = request.json

# Update task fields
if 'title' in data:
task.title = data['title']

if 'description' in data:
task.description = data['description']

if 'status' in data:
task.status = data['status']

if 'priority' in data:
task.priority = data['priority']

if 'assigned_to' in data:
task.assigned_to = data['assigned_to']

if 'due_date' in data:
if data['due_date']:
try:
task.due_date = datetime.datetime.fromisoformat(data['due_date'])
except ValueError:
return jsonify({'error': 'Invalid date format'}), 400
else:
task.due_date = None

db.session.commit()

return jsonify({
'message': 'Task updated successfully',
'task': {
'id': task.id,
'title': task.title,
'status': task.status,
'priority': task.priority,
'assigned_to': task.assigned_to
}
})

@app.route('/api/projects/<int:project_id>/citations', methods=['POST'])
def add_citation(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

data = request.json

# Create citation
new_citation = Citation(
title=data['title'],
authors=data.get('authors', ''),
journal=data.get('journal', ''),
year=data.get('year'),
doi=data.get('doi', ''),
url=data.get('url', ''),
abstract=data.get('abstract', ''),
notes=data.get('notes', ''),
added_by=user.id,
project_id=project_id
)

db.session.add(new_citation)
db.session.commit()

return jsonify({
'message': 'Citation added successfully',
'citation': {
'id': new_citation.id,
'title': new_citation.title,
'authors': new_citation.authors,
'year': new_citation.year
}
})

@app.route('/api/files/<int:file_id>/visualize', methods=['POST'])
def visualize_file(file_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

file_record = File.query.get(file_id)

if not file_record:
return jsonify({'error': 'File not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
project = Project.query.get(file_record.project_id)

if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

data = request.json
vis_type = data.get('type', 'line')
options = data.get('options', {})

visualization = generate_visualization(file_record.file_path, file_record.file_type, vis_type, options)

if 'error' in visualization:
return jsonify({'error': visualization['error']}), 400

return jsonify(visualization)

# WebSocket handlers for real-time collaboration
@socketio.on('connect')
def handle_connect():
if 'user_id' not in session:
return False # Reject connection if not authenticated

logger.info(f"Client connected: {request.sid}")

@socketio.on('join')
def handle_join(data):
if 'user_id' not in session:
return

room = data.get('room')
if not room:
return

# Check if user has access to the project
project_id = int(room.replace('project_', ''))
user = User.query.get(session['user_id'])
project = Project.query.get(project_id)

if not project or (user not in project.members and project.owner_id != user.id):
return

join_room(room)

# Notify others in the room
emit('user_joined', {
'user_id': user.id,
'username': user.username
}, room=room, include_self=False)

logger.info(f"User {user.username} joined room {room}")

@socketio.on('leave')
def handle_leave(data):
if 'user_id' not in session:
return

room = data.get('room')
if not room:
return

leave_room(room)

# Notify others in the room
user = User.query.get(session['user_id'])
emit('user_left', {
'user_id': user.id,
'username': user.username
}, room=room, include_self=False)

logger.info(f"User {user.username} left room {room}")

@socketio.on('editor_update')
def handle_editor_update(data):
if 'user_id' not in session:
return

room = data.get('room')
file_id = data.get('file_id')
content = data.get('content')
cursor_position = data.get('cursor_position')

if not room or not file_id:
return

# Check if user has access to the file
user = User.query.get(session['user_id'])
file_record = File.query.get(file_id)

if not file_record:
return

project = Project.query.get(file_record.project_id)

if not project or (user not in project.members and project.owner_id != user.id):
return

# Forward update to other users in the room
emit('editor_update', {
'user_id': user.id,
'username': user.username,
'file_id': file_id,
'content': content,
'cursor_position': cursor_position,
'timestamp': datetime.datetime.utcnow().isoformat()
}, room=room, include_self=False)

@socketio.on('chat_message')
def handle_chat_message(data):
if 'user_id' not in session:
return

room = data.get('room')
message = data.get('message')

if not room or not message:
return

# Check if user has access to the project
project_id = int(room.replace('project_', ''))
user = User.query.get(session['user_id'])
project = Project.query.get(project_id)

if not project or (user not in project.members and project.owner_id != user.id):
return

# Save message to database
new_message = ChatMessage(
content=message,
user_id=user.id,
project_id=project_id
)

db.session.add(new_message)
db.session.commit()

# Forward message to all users in the room
emit('chat_message', {
'id': new_message.id,
'user_id': user.id,
'username': user.username,
'message': message,
'timestamp': new_message.timestamp.isoformat()
}, room=room)

@app.route('/api/projects/<int:project_id>/messages', methods=['GET'])
def get_chat_history(project_id):
if 'user_id' not in session:
return jsonify({'error': 'Unauthorized'}), 401

project = Project.query.get(project_id)

if not project:
return jsonify({'error': 'Project not found'}), 404

# Check if user is a project member
user = User.query.get(session['user_id'])
if user not in project.members and project.owner_id != user.id:
return jsonify({'error': 'Access denied'}), 403

# Get chat messages
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)

messages = ChatMessage.query.filter_by(project_id=project_id)\
.order_by(ChatMessage.timestamp.desc())\
.limit(limit).offset(offset).all()

messages_data = []
for message in reversed(messages): # Reverse to get chronological order
messages_data.append({
'id': message.id,
'user_id': message.user_id,
'username': message.user.username if message.user else 'Unknown',
'message': message.content,
'timestamp': message.timestamp.isoformat()
})

return jsonify(messages_data)

if __name__ == '__main__':
with app.app_context():
db.create_all()
socketio.run(app, debug=True)

Resources:

9. Personalized Learning Path Generator

A futuristic digital learning platform showcasing a Personalized Learning Path Generator. The interface displays an AI-powered dashboard that customizes study plans based on student progress and preferences. A student interacts with a holographic screen showing adaptive learning modules, skill progression charts, and AI-driven recommendations. The background features a modern tech-enhanced study environment with digital books, interactive lessons, and a smart assistant guiding the learning journey.

Difficulty Level: Intermediate

Skills Developed: Recommendation Systems, Educational Technology, Graph Algorithms, User Experience Design

Project Description: Build a system that creates personalized learning paths for students based on their goals, current skills, and learning preferences. This application maps out educational resources, courses, and projects in an optimal sequence to help students efficiently acquire new skills.

Key Features:

  • Skill assessment quizzes
  • Learning style identification
  • Custom learning path generation
  • Resource recommendation from multiple sources
  • Progress tracking and adaptive paths
  • Peer learning group recommendations

Implementation Steps:

  1. Design the knowledge graph representing skills and resources
  2. Implement assessment algorithms
  3. Develop path-finding algorithms for learning sequences
  4. Create recommendation engines for resources
  5. Build a user interface using React with a Python backend
  6. Implement progress tracking and adaption systems

Sample Code Implementation:

pythonCopy# learning_path_generator.py
from flask import Flask, jsonify, request, session
from flask_cors import CORS
import pandas as pd
import numpy as np
import networkx as nx
import json
import os
import sqlite3
import hashlib
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import datetime
import requests
from surprise import Dataset, Reader, SVD
from surprise.model_selection import train_test_split
from collections import defaultdict

app = Flask(__name__)
app.secret_key = 'your-secret-key'
CORS(app)

# Database setup
def init_db():
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    # Users table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT UNIQUE NOT NULL,
        email TEXT UNIQUE NOT NULL,
        password_hash TEXT NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    # User profiles table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS user_profiles (
        user_id INTEGER PRIMARY KEY,
        learning_style TEXT,
        interests TEXT,
        goals TEXT,
        background TEXT,
        preferences TEXT,
        last_updated TIMESTAMP,
        FOREIGN KEY (user_id) REFERENCES users (id)
    )
    ''')
    
    # Skills table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS skills (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT UNIQUE NOT NULL,
        category TEXT,
        description TEXT,
        difficulty INTEGER
    )
    ''')
    
    # Resources table (courses, tutorials, projects, etc.)
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS resources (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title TEXT NOT NULL,
        description TEXT,
        resource_type TEXT,
        url TEXT,
        provider TEXT,
        duration INTEGER,
        difficulty INTEGER,
        rating REAL,
        tags TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    # Resource-Skill relationship
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS resource_skills (
        resource_id INTEGER,
        skill_id INTEGER,
        relationship_type TEXT,
        weight REAL,
        PRIMARY KEY (resource_id, skill_id),
        FOREIGN KEY (resource_id) REFERENCES resources (id),
        FOREIGN KEY (skill_id) REFERENCES skills (id)
    )
    ''')
    
    # Skill prerequisites
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS skill_prerequisites (
        skill_id INTEGER,
        prerequisite_id INTEGER,
        strength REAL,
        PRIMARY KEY (skill_id, prerequisite_id),
        FOREIGN KEY (skill_id) REFERENCES skills (id),
        FOREIGN KEY (prerequisite_id) REFERENCES skills (id)
    )
    ''')
    
    # User skill assessments
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS user_skills (
        user_id INTEGER,
        skill_id INTEGER,
        proficiency_level REAL,
        assessment_date TIMESTAMP,
        PRIMARY KEY (user_id, skill_id),
        FOREIGN KEY (user_id) REFERENCES users (id),
        FOREIGN KEY (skill_id) REFERENCES skills (id)
    )
    ''')
    
    # Learning paths
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS learning_paths (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id INTEGER,
        name TEXT,
        description TEXT,
        goal_skills TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (user_id) REFERENCES users (id)
    )
    ''')
    
    # Learning path steps
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS path_steps (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        path_id INTEGER,
        step_number INTEGER,
        resource_id INTEGER,
        status TEXT DEFAULT 'not_started',
        completed_at TIMESTAMP,
        FOREIGN KEY (path_id) REFERENCES learning_paths (id),
        FOREIGN KEY (resource_id) REFERENCES resources (id)
    )
    ''')
    
    # Resource ratings by users
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS resource_ratings (
        user_id INTEGER,
        resource_id INTEGER,
        rating REAL,
        review TEXT,
        timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (user_id, resource_id),
        FOREIGN KEY (user_id) REFERENCES users (id),
        FOREIGN KEY (resource_id) REFERENCES resources (id)
    )
    ''')
    
    # Progress tracking
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS user_progress (
        user_id INTEGER,
        resource_id INTEGER,
        status TEXT,
        progress REAL,
        started_at TIMESTAMP,
        completed_at TIMESTAMP,
        notes TEXT,
        PRIMARY KEY (user_id, resource_id),
        FOREIGN KEY (user_id) REFERENCES users (id),
        FOREIGN KEY (resource_id) REFERENCES resources (id)
    )
    ''')
    
    # Learning groups
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS learning_groups (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        description TEXT,
        skill_focus TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')
    
    # Group members
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS group_members (
        group_id INTEGER,
        user_id INTEGER,
        joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (group_id, user_id),
        FOREIGN KEY (group_id) REFERENCES learning_groups (id),
        FOREIGN KEY (user_id) REFERENCES users (id)
    )
    ''')
    
    conn.commit()
    conn.close()

# Initialize the database
init_db()

# Load sample data for demonstration
def load_sample_data():
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    # Check if data already exists
    cursor.execute("SELECT COUNT(*) FROM skills")
    if cursor.fetchone()[0] > 0:
        conn.close()
        return
    
    # Sample skills
    skills = [
        (1, 'Python Basics', 'Programming', 'Fundamental Python programming concepts', 1),
        (2, 'Data Structures', 'Programming', 'Arrays, lists, stacks, queues, trees, graphs', 2),
        (3, 'Algorithms', 'Programming', 'Sorting, searching, dynamic programming', 3),
        (4, 'Git', 'Tools', 'Version control with Git', 1),
        (5, 'SQL', 'Database', 'Structured Query Language for databases', 2),
        (6, 'Machine Learning Basics', 'Data Science', 'Introduction to ML concepts', 3),
        (7, 'Neural Networks', 'Data Science', 'Deep learning and neural networks', 4),
        (8, 'Web Development', 'Web', 'HTML, CSS, JavaScript basics', 2),
        (9, 'Flask', 'Web', 'Python web framework', 3),
        (10, 'React', 'Web', 'Frontend JavaScript library', 3),
        (11, 'Data Analysis', 'Data Science', 'Data cleaning, exploration, visualization', 2),
        (12, 'Natural Language Processing', 'Data Science', 'Text processing and analysis', 4),
        (13, 'Computer Vision', 'Data Science', 'Image and video analysis', 4),
        (14, 'DevOps Basics', 'Operations', 'CI/CD, deployment, containers', 3),
        (15, 'Cloud Computing', 'Operations', 'AWS, Azure, GCP basics', 3)
    ]
    
    cursor.executemany('''
    INSERT OR IGNORE INTO skills (id, name, category, description, difficulty)
    VALUES (?, ?, ?, ?, ?)
    ''', skills)
    
    # Sample skill prerequisites
    prerequisites = [
        (6, 1, 0.9),  # ML Basics requires Python Basics
        (6, 2, 0.7),  # ML Basics requires Data Structures
        (6, 11, 0.8),  # ML Basics requires Data Analysis
        (7, 6, 0.9),  # Neural Networks requires ML Basics
        (9, 1, 0.8),  # Flask requires Python Basics
        (9, 8, 0.7),  # Flask requires Web Development
        (11, 1, 0.8),  # Data Analysis requires Python Basics
        (12, 6, 0.8),  # NLP requires ML Basics
        (12, 11, 0.7),  # NLP requires Data Analysis
        (13, 6, 0.8),  # Computer Vision requires ML Basics
        (14, 4, 0.6)   # DevOps requires Git
    ]
    
    cursor.executemany('''
    INSERT OR IGNORE INTO skill_prerequisites (skill_id, prerequisite_id, strength)
    VALUES (?, ?, ?)
    ''', prerequisites)
    
    # Sample resources
    resources = [
        (1, 'Python for Beginners', 'Comprehensive Python course for beginners', 'course', 
         'https://example.com/python-beginners', 'Codecademy', 20, 1, 4.5, 'python,programming,beginners'),
        (2, 'Data Structures in Python', 'Learn essential data structures', 'course', 
         'https://example.com/python-data-structures', 'Udacity', 30, 2, 4.2, 'python,data-structures,algorithms'),
        (3, 'Introduction to Algorithms', 'Algorithm design and analysis', 'book', 
         'https://example.com/intro-algorithms', 'MIT Press', 60, 3, 4.8, 'algorithms,computer-science'),
        (4, 'Git & GitHub Crash Course', 'Quick introduction to Git', 'tutorial', 
         'https://example.com/git-crash-course', 'YouTube', 2, 1, 4.7, 'git,github,version-control'),
        (5, 'SQL for Data Analysis', 'Using SQL for data work', 'course', 
         'https://example.com/sql-data-analysis', 'DataCamp', 15, 2, 4.4, 'sql,database,data-analysis'),
        (6, 'Machine Learning Foundations', 'Introduction to ML concepts and techniques', 'course', 
         'https://example.com/ml-foundations', 'Coursera', 40, 3, 4.6, 'machine-learning,data-science,python'),
        (7, 'Deep Learning Specialization', 'Comprehensive deep learning course', 'specialization', 
         'https://example.com/deep-learning', 'Coursera', 80, 4, 4.9, 'deep-learning,neural-networks,ai'),
        (8, 'Web Development Bootcamp', 'Complete web dev course', 'bootcamp', 
         'https://example.com/web-bootcamp', 'Udemy', 60, 2, 4.5, 'web,html,css,javascript'),
        (9, 'Flask Web Development', 'Building web applications with Flask', 'course', 
         'https://example.com/flask-dev', 'Real Python', 25, 3, 4.3, 'flask,python,web-development'),
        (10, 'React - The Complete Guide', 'Comprehensive React tutorial', 'course', 
         'https://example.com/react-guide', 'Udemy', 40, 3, 4.7, 'react,javascript,frontend')
    ]
    
    cursor.executemany('''
    INSERT OR IGNORE INTO resources (id, title, description, resource_type, url, provider, duration, difficulty, rating, tags)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', resources)
    
    # Sample resource-skill relationships
    resource_skills = [
        (1, 1, 'teaches', 0.9),    # Python for Beginners teaches Python Basics
        (2, 2, 'teaches', 0.9),    # Data Structures in Python teaches Data Structures
        (3, 3, 'teaches', 0.9),    # Intro to Algorithms teaches Algorithms
        (4, 4, 'teaches', 0.9),    # Git Crash Course teaches Git
        (5, 5, 'teaches', 0.9),    # SQL for Data Analysis teaches SQL
        (6, 6, 'teaches', 0.9),    # ML Foundations teaches ML Basics
        (7, 7, 'teaches', 0.9),    # Deep Learning Specialization teaches Neural Networks
        (8, 8, 'teaches', 0.9),    # Web Development Bootcamp teaches Web Development
        (9, 9, 'teaches', 0.9),    # Flask Web Development teaches Flask
        (10, 10, 'teaches', 0.9),  # React Guide teaches React
        (6, 1, 'requires', 0.7),   # ML Foundations requires Python Basics
        (6, 11, 'teaches', 0.5),   # ML Foundations partially teaches Data Analysis
        (7, 6, 'requires', 0.8),   # Deep Learning requires ML Basics
        (9, 1, 'requires', 0.7),   # Flask Web Development requires Python Basics
        (9, 8, 'requires', 0.5)    # Flask Web Development somewhat requires Web Development
    ]
    
    cursor.executemany('''
    INSERT OR IGNORE INTO resource_skills (resource_id, skill_id, relationship_type, weight)
    VALUES (?, ?, ?, ?)
    ''', resource_skills)
    
    conn.commit()
    conn.close()

# Load sample data
load_sample_data()

class SkillGraph:
    """Represents the knowledge graph of skills and their relationships."""
    
    def __init__(self):
        self.graph = nx.DiGraph()
        self.load_from_database()
    
    def load_from_database(self):
        """Load skills and prerequisites from the database."""
        conn = sqlite3.connect('learning_path.db')
        
        # Load skills
        skills_df = pd.read_sql("SELECT id, name, category, difficulty FROM skills", conn)
        
        # Add nodes to the graph
        for _, skill in skills_df.iterrows():
            self.graph.add_node(
                skill['id'],
                name=skill['name'],
                category=skill['category'],
                difficulty=skill['difficulty']
            )
        
        # Load prerequisites
        prereqs_df = pd.read_sql(
            "SELECT skill_id, prerequisite_id, strength FROM skill_prerequisites", 
            conn
        )
        
        # Add edges to the graph
        for _, prereq in prereqs_df.iterrows():
            self.graph.add_edge(
                prereq['prerequisite_id'],
                prereq['skill_id'],
                weight=prereq['strength']
            )
        
        conn.close()
    
    def get_prerequisites(self, skill_id, recursive=False):
        """Get prerequisites for a skill."""
        if not recursive:
            # Direct prerequisites only
            return list(self.graph.predecessors(skill_id))
        else:
            # All ancestors (recursive prerequisites)
            return list(nx.ancestors(self.graph, skill_id))
    
    def get_dependent_skills(self, skill_id, recursive=False):
        """Get skills that depend on this skill."""
        if not recursive:
            # Direct dependents only
            return list(self.graph.successors(skill_id))
        else:
            # All descendants (recursive dependents)
            return list(nx.descendants(self.graph, skill_id))
    
    def find_shortest_path(self, from_skill_id, to_skill_id):
        """Find the shortest path between two skills."""
        try:
            path = nx.shortest_path(self.graph, from_skill_id, to_skill_id)
            return path
        except (nx.NetworkXNoPath, nx.NodeNotFound):
            return None
    
    def find_learning_path(self, user_skills, target_skills):
        """
        Find an optimal learning path from user's current skills to target skills.
        
        Parameters:
        - user_skills: dict mapping skill_id to proficiency level (0-1)
        - target_skills: list of skill_ids to learn
        
        Returns:
        - ordered list of skill_ids representing the learning path
        """
        # Create a subgraph of skills the user hasn't mastered yet
        unmastered_graph = self.graph.copy()
        
        # Remove skills the user has already mastered (proficiency > 0.7)
        for skill_id, proficiency in user_skills.items():
            if proficiency > 0.7 and skill_id in unmastered_graph:
                unmastered_graph.remove_node(skill_id)
        
        learning_path = []
        visited = set()
        
        # Find path to each target skill
        for target in target_skills:
            if target in visited:
                continue
            
            # Find prerequisites for the target skill
            prereqs = self.get_prerequisites(target, recursive=True)
            
            # Sort prerequisites by their topological order
            if prereqs:
                # Get subgraph containing only these prerequisites
                prereq_graph = nx.subgraph(self.graph, prereqs + [target])
                try:
                    # Get topological sort (respects prerequisites)
                    topo_sort = list(nx.topological_sort(prereq_graph))
                    
                    # Filter out skills that are already mastered or visited
                    filtered_path = [s for s in topo_sort if (
                        s not in user_skills or user_skills[s] <= 0.7
                    ) and s not in visited]
                    
                    learning_path.extend(filtered_path)
                    visited.update(filtered_path)
                except nx.NetworkXUnfeasible:
                    # Graph has cycles, use a different approach
                    for prereq in sorted(prereqs, key=lambda x: self.graph.nodes[x]['difficulty']):
                        if prereq not in visited and (prereq not in user_skills or user_skills[prereq] <= 0.7):
                            learning_path.append(prereq)
                            visited.add(prereq)
            
            # Add the target skill itself if not mastered
            if target not in visited and (target not in user_skills or user_skills[target] <= 0.7):
                learning_path.append(target)
                visited.add(target)
        
        return learning_path

class ResourceRecommender:
    """Recommends learning resources based on skills and user preferences."""
    
    def __init__(self):
        self.vectorizer = TfidfVectorizer(stop_words='english')
        self.load_resources()
    
    def load_resources(self):
        """Load resources from the database."""
        conn = sqlite3.connect('learning_path.db')
        
        # Load resources
        self.resources_df = pd.read_sql("""
            SELECT r.id, r.title, r.description, r.resource_type, r.provider, 
                   r.duration, r.difficulty, r.rating, r.tags, rs.skill_id, rs.relationship_type, rs.weight
            FROM resources r
            JOIN resource_skills rs ON r.id = rs.resource_id
        """, conn)
        
        # Prepare content for recommendation
        self.resources_df['content'] = (
            self.resources_df['title'] + ' ' + 
            self.resources_df['description'] + ' ' + 
            self.resources_df['tags']
        )
        
        # Compute TF-IDF matrix
        if len(self.resources_df) > 0:
            self.tfidf_matrix = self.vectorizer.fit_transform(self.resources_df['content'])
        else:
            self.tfidf_matrix = None
        
        conn.close()
    
    def recommend_for_skill(self, skill_id, user_id=None, n=3):
        """Recommend resources for learning a specific skill."""
        # Filter resources that teach this skill
        skill_resources = self.resources_df[
            (self.resources_df['skill_id'] == skill_id) & 
            (self.resources_df['relationship_type'] == 'teaches')
        ]
        
        if len(skill_resources) == 0:
            return []
        
        # If we have user_id, use their ratings for personalized recommendations
        if user_id:
            conn = sqlite3.connect('learning_path.db')
            user_ratings = pd.read_sql(f"""
                SELECT resource_id, rating
                FROM resource_ratings
                WHERE user_id = {user_id}
            """, conn)
            conn.close()
            
            if len(user_ratings) > 0:
                # Use collaborative filtering if we have enough ratings
                if len(user_ratings) >= 5:
                    return self._collaborative_recommendations(user_id, skill_id, n)
                
                # Otherwise use a hybrid approach
                return self._hybrid_recommendations(user_id, skill_id, n)
        
        # If no user_id or not enough ratings, use content-based
        return self._content_based_recommendations(skill_id, n)
    
    def _content_based_recommendations(self, skill_id, n=3):
        """Generate content-based recommendations."""
        # Filter resources for this skill
        skill_resources = self.resources_df[
            (self.resources_df['skill_id'] == skill_id) & 
            (self.resources_df['relationship_type'] == 'teaches')
        ]
        
        if len(skill_resources) <= n:
            return skill_resources.sort_values('rating', ascending=False)[['id', 'title', 'resource_type', 'difficulty', 'rating']].to_dict('records')
        
        # Sort by a combination of rating and teaching weight
        skill_resources['score'] = skill_resources['rating'] * skill_resources['weight']
        
        return skill_resources.sort_values('score', ascending=False).head(n)[['id', 'title', 'resource_type', 'difficulty', 'rating']].to_dict('records')
    
    def _collaborative_recommendations(self, user_id, skill_id, n=3):
        """Generate collaborative filtering recommendations."""
        conn = sqlite3.connect('learning_path.db')
        
        # Get all ratings
        ratings_df = pd.read_sql("""
            SELECT user_id, resource_id, rating
            FROM resource_ratings
        """, conn)
        
        # Get resources for this skill
        skill_resources = pd.read_sql(f"""
            SELECT r.id, r.title, r.resource_type, r.difficulty, r.rating
            FROM resources r
            JOIN resource_skills rs ON r.id = rs.resource_id
            WHERE rs.skill_id = {skill_id} AND rs.relationship_type = 'teaches'
        """, conn)
        
        conn.close()
        
        if len(ratings_df) < 10 or len(skill_resources) == 0:
            # Not enough data for collaborative filtering
            return self._content_based_recommendations(skill_id, n)
        
        # Prepare data for Surprise
        reader = Reader(rating_scale=(1, 5))
        data = Dataset.load_from_df(ratings_df[['user_id', 'resource_id', 'rating']], reader)
        
        # Create a training set
        trainset = data.build_full_trainset()
        
        # Train the algorithm
        algo = SVD()
        algo.fit(trainset)
        
        # Predict ratings for resources this user hasn't rated
        user_ratings = ratings_df[ratings_df['user_id'] == user_id]['resource_id'].tolist()
        skill_resource_ids = skill_resources['id'].tolist()
        
        predictions = []
        for resource_id in skill_resource_ids:
            if resource_id not in user_ratings:
                pred = algo.predict(user_id, resource_id)
                predictions.append((resource_id, pred.est))
        
        # Sort by predicted rating
        predictions.sort(key=lambda x: x[1], reverse=True)
        top_resources = [p[0] for p in predictions[:n]]
        
        # Get resource details
        recommended_resources = skill_resources[skill_resources['id'].isin(top_resources)]
        
        # Re-sort by predicted rating order
        recommended_resources = recommended_resources.set_index('id').loc[top_resources].reset_index()
        
        return recommended_resources.to_dict('records')
    
    def _hybrid_recommendations(self, user_id, skill_id, n=3):
        """Combine content-based and collaborative filtering approaches."""
        # Start with content-based recommendations
        content_recs = self._content_based_recommendations(skill_id, n)
        
        # Get user preferences
        conn = sqlite3.connect('learning_path.db')
        user_profile = pd.read_sql(f"""
            SELECT preferences
            FROM user_profiles
            WHERE user_id = {user_id}
        """, conn)
        
        user_ratings = pd.read_sql(f"""
            SELECT r.resource_type, r.difficulty, r.provider, rr.rating
            FROM resource_ratings rr
            JOIN resources r ON rr.resource_id = r.id
            WHERE rr.user_id = {user_id}
        """, conn)
        
        conn.close()
        
        # If no user data, return content-based only
        if len(user_profile) == 0 and len(user_ratings) == 0:
            return content_recs
        
        # Analyze user preferences
        preferences = {}
        
        if len(user_profile) > 0 and user_profile.iloc[0]['preferences']:
            try:
                preferences = json.loads(user_profile.iloc[0]['preferences'])
            except:
                preferences = {}
        
        if len(user_ratings) > 0:
            # Extract preferred resource types
            type_ratings = user_ratings.groupby('resource_type')['rating'].mean()
            difficulty_ratings = user_ratings.groupby('difficulty')['rating'].mean()
            provider_ratings = user_ratings.groupby('provider')['rating'].mean()
            
            preferences['preferred_types'] = type_ratings[type_ratings > 3.5].index.tolist()
            preferences['preferred_difficulty'] = difficulty_ratings.idxmax()
            preferences['preferred_providers'] = provider_ratings[provider_ratings > 4.0].index.tolist()
        
        # Get additional resources for this skill
        conn = sqlite3.connect('learning_path.db')
        all_skill_resources = pd.read_sql(f"""
            SELECT r.id, r.title, r.resource_type, r.difficulty, r.rating, r.provider
            FROM resources r
            JOIN resource_skills rs ON r.id = rs.resource_id
            WHERE rs.skill_id = {skill_id} AND rs.relationship_type = 'teaches'
        """, conn)
        conn.close()
        
        # Filter based on preferences
        filtered_resources = all_skill_resources.copy()
        
        if 'preferred_types' in preferences and preferences['preferred_types']:
            type_mask = filtered_resources['resource_type'].isin(preferences['preferred_types'])
            if type_mask.sum() > 0:
                filtered_resources = filtered_resources[type_mask]
        
        if 'preferred_providers' in preferences and preferences['preferred_providers']:
            provider_mask = filtered_resources['provider'].isin(preferences['preferred_providers'])
            if provider_mask.sum() > 0:
                filtered_resources = filtered_resources[provider_mask]
        
        if 'preferred_difficulty' in preferences:
            diff = preferences['preferred_difficulty']
            # Prefer resources with difficulty close to preferred level
            filtered_resources['diff_distance'] = abs(filtered_resources['difficulty'] - diff)
            filtered_resources = filtered_resources.sort_values('diff_distance')
        
        # Combine with content-based recommendations
        content_rec_ids = [r['id'] for r in content_recs]
        additional_recs = filtered_resources[~filtered_resources['id'].isin(content_rec_ids)]
        
        # Add more recommendations if needed
        if len(content_recs) < n and len(additional_recs) > 0:
            additional_recs = additional_recs.sort_values('rating', ascending=False)
            num_additional = min(n - len(content_recs), len(additional_recs))
            
            for i in range(num_additional):
                rec = additional_recs.iloc[i]
                content_recs.append({
                    'id': rec['id'],
                    'title': rec['title'],
                    'resource_type': rec['resource_type'],
                    'difficulty': rec['difficulty'],
                    'rating': rec['rating']
                })
        
        return content_recs

class LearningPathGenerator:
    """Main class for generating personalized learning paths."""
    
    def __init__(self):
        self.skill_graph = SkillGraph()
        self.recommender = ResourceRecommender()
    
    def assess_user_skills(self, user_id):
        """Get current user skill levels from the database."""
        conn = sqlite3.connect('learning_path.db')
        
        user_skills_df = pd.read_sql(f"""
            SELECT skill_id, proficiency_level
            FROM user_skills
            WHERE user_id = {user_id}
        """, conn)
        
        conn.close()
        
        user_skills = {}
        for _, row in user_skills_df.iterrows():
            user_skills[row['skill_id']] = row['proficiency_level']
        
        return user_skills
    
    def get_user_profile(self, user_id):
        """Get user profile with learning preferences."""
        conn = sqlite3.connect('learning_path.db')
        
        profile_df = pd.read_sql(f"""
            SELECT learning_style, interests, goals, background, preferences
            FROM user_profiles
            WHERE user_id = {user_id}
        """, conn)
        
        conn.close()
        
        if len(profile_df) == 0:
            return {}
        
        profile = profile_df.iloc[0].to_dict()
        
        # Parse JSON fields
        for field in ['interests', 'goals', 'preferences']:
            if profile[field] and isinstance(profile[field], str):
                try:
                    profile[field] = json.loads(profile[field])
                except:
                    profile[field] = {}
        
        return profile
    
    def generate_learning_path(self, user_id, target_skills, name=None, description=None):
        """
        Generate a personalized learning path for a user.
        
        Parameters:
        - user_id: ID of the user
        - target_skills: List of skill IDs the user wants to learn
        - name: Optional name for the learning path
        - description: Optional description for the learning path
        
        Returns:
        - The created learning path with steps
        """
        # Get user's current skills
        user_skills = self.assess_user_skills(user_id)
        
        # Get user's learning profile
        user_profile = self.get_user_profile(user_id)
        
        # Find optimal skill sequence
        skill_sequence = self.skill_graph.find_learning_path(user_skills, target_skills)
        
        if not skill_sequence:
            return {"error": "Could not generate a learning path for the target skills"}
        
        # Create a new learning path
        conn = sqlite3.connect('learning_path.db')
        cursor = conn.cursor()
        
        path_name = name or f"Learning Path {datetime.datetime.now().strftime('%Y-%m-%d')}"
        path_desc = description or f"Path to learn {', '.join(map(str, target_skills))}"
        target_skills_json = json.dumps(target_skills)
        
        cursor.execute("""
            INSERT INTO learning_paths (user_id, name, description, goal_skills)
            VALUES (?, ?, ?, ?)
        """, (user_id, path_name, path_desc, target_skills_json))
        
        path_id = cursor.lastrowid
        
        # Generate resource recommendations for each skill
        step_number = 1
        path_steps = []
        
        for skill_id in skill_sequence:
            # Get skill info
            cursor.execute("SELECT name FROM skills WHERE id = ?", (skill_id,))
            skill_name = cursor.fetchone()[0]
            
            # Get recommended resources
            resources = self.recommender.recommend_for_skill(skill_id, user_id, n=2)
            
            if resources:
                for resource in resources:
                    # Add as a path step
                    cursor.execute("""
                        INSERT INTO path_steps (path_id, step_number, resource_id, status)
                        VALUES (?, ?, ?, 'not_started')
                    """, (path_id, step_number, resource['id']))
                    
                    # Get resource details
                    cursor.execute("""
                        SELECT title, resource_type, difficulty, duration, url
                        FROM resources
                        WHERE id = ?
                    """, (resource['id'],))
                    
                    resource_details = cursor.fetchone()
                    title, resource_type, difficulty, duration, url = resource_details
                    
                    path_steps.append({
                        'step_number': step_number,
                        'skill_id': skill_id,
                        'skill_name': skill_name,
                        'resource_id': resource['id'],
                        'resource_title': title,
                        'resource_type': resource_type,
                        'difficulty': difficulty,
                        'duration': duration,
                        'url': url,
                        'status': 'not_started'
                    })
                    
                    step_number += 1
        
        conn.commit()
        conn.close()
        
        return {
            'path_id': path_id,
            'name': path_name,
            'description': path_desc,
            'target_skills': target_skills,
            'steps': path_steps
        }
    
    def recommend_learning_groups(self, user_id, skill_id=None):
        """Recommend learning groups for a user based on skills and interests."""
        conn = sqlite3.connect('learning_path.db')
        
        # Get all learning groups
        groups_df = pd.read_sql("""
            SELECT lg.id, lg.name, lg.description, lg.skill_focus, COUNT(gm.user_id) as member_count
            FROM learning_groups lg
            LEFT JOIN group_members gm ON lg.id = gm.group_id
            GROUP BY lg.id
        """, conn)
        
        # If no groups exist, return empty list
        if len(groups_df) == 0:
            conn.close()
            return []
        
        # If a specific skill is provided, filter for that skill
        if skill_id:
            skill_focused_groups = groups_df[groups_df['skill_focus'].apply(
                lambda x: str(skill_id) in x.split(',') if x else False
            )]
            
            if len(skill_focused_groups) > 0:
                recommended_groups = skill_focused_groups.sort_values('member_count', ascending=False).head(3)
                conn.close()
                return recommended_groups.to_dict('records')
        
        # Get user's interests and skills
        user_profile = self.get_user_profile(user_id)
        user_skills = self.assess_user_skills(user_id)
        
        # Get user's learning paths
        paths_df = pd.read_sql(f"""
            SELECT goal_skills
            FROM learning_paths
            WHERE user_id = {user_id}
        """, conn)
        
        conn.close()
        
        # Extract skills of interest
        target_skills = []
        
        if 'interests' in user_profile and user_profile['interests']:
            # Check if interests include skills
            interests = user_profile['interests']
            if isinstance(interests, list):
                # Assume these are skill names or IDs
                target_skills.extend(interests)
        
        if len(paths_df) > 0:
            # Extract target skills from learning paths
            for _, path in paths_df.iterrows():
                if path['goal_skills']:
                    try:
                        goals = json.loads(path['goal_skills'])
                        if isinstance(goals, list):
                            target_skills.extend(goals)
                    except:
                        pass
        
        # If we have target skills, recommend groups based on those
        if target_skills:
            # Convert to set to remove duplicates
            target_skills = set(target_skills)
            
            # Filter groups by skill focus
            skill_matches = []
            for _, group in groups_df.iterrows():
                if not group['skill_focus']:
                    continue
                
                group_skills = set(str(group['skill_focus']).split(','))
                match_score = len(target_skills.intersection(group_skills))
                if match_score > 0:
                    skill_matches.append((group, match_score))
            
            if skill_matches:
                # Sort by match score and then by member count
                skill_matches.sort(key=lambda x: (-x[1], -x[0]['member_count']))
                recommended_groups = [match[0] for match in skill_matches[:3]]
                return recommended_groups
        
        # If no matches or no target skills, return most active groups
        return groups_df.sort_values('member_count', ascending=False).head(3).to_dict('records')
    
    def update_learning_path(self, path_id, updates):
        """Update a learning path based on user progress and feedback."""
        conn = sqlite3.connect('learning_path.db')
        cursor = conn.cursor()
        
        # Get current path information
        cursor.execute("""
            SELECT lp.user_id, lp.goal_skills, ps.id, ps.resource_id, ps.status
            FROM learning_paths lp
            JOIN path_steps ps ON lp.id = ps.path_id
            WHERE lp.id = ?
        """, (path_id,))
        
        path_data = cursor.fetchall()
        
        if not path_data:
            conn.close()
            return {"error": "Learning path not found"}
        
        user_id = path_data[0][0]
        goal_skills = json.loads(path_data[0][1]) if path_data[0][1] else []
        
        # Update step statuses
        for step_update in updates.get('step_updates', []):
            step_id = step_update.get('step_id')
            new_status = step_update.get('status')
            
            if step_id and new_status:
                cursor.execute("""
                    UPDATE path_steps
                    SET status = ?, completed_at = ?
                    WHERE id = ? AND path_id = ?
                """, (
                    new_status, 
                    datetime.datetime.now() if new_status == 'completed' else None,
                    step_id, path_id
                ))
        
        # If user completed resources, update their skill proficiency
        completed_resources = [u['step_id'] for u in updates.get('step_updates', []) 
                              if u.get('status') == 'completed']
        
        if completed_resources:
            # Get resource-skill relationships for completed resources
            placeholders = ','.join(['?'] * len(completed_resources))
            cursor.execute(f"""
                SELECT rs.skill_id, rs.weight, ps.resource_id
                FROM path_steps ps
                JOIN resource_skills rs ON ps.resource_id = rs.resource_id
                WHERE ps.id IN ({placeholders}) AND rs.relationship_type = 'teaches'
            """, completed_resources)
            
            skill_progress = {}
            for skill_id, weight, resource_id in cursor.fetchall():
                if skill_id not in skill_progress:
                    skill_progress[skill_id] = 0
                
                # Add weighted contribution to skill proficiency
                skill_progress[skill_id] += float(weight) * 0.2  # Each resource contributes up to 20% progress
            
            # Update user skills
            for skill_id, progress in skill_progress.items():
                # Get current proficiency
                cursor.execute("""
                    SELECT proficiency_level
                    FROM user_skills
                    WHERE user_id = ? AND skill_id = ?
                """, (user_id, skill_id))
                
                result = cursor.fetchone()
                current_proficiency = result[0] if result else 0
                
                # Calculate new proficiency (capped at 1.0)
                new_proficiency = min(1.0, current_proficiency + progress)
                
                # Update or insert
                if result:
                    cursor.execute("""
                        UPDATE user_skills
                        SET proficiency_level = ?, assessment_date = ?
                        WHERE user_id = ? AND skill_id = ?
                    """, (new_proficiency, datetime.datetime.now(), user_id, skill_id))
                else:
                    cursor.execute("""
                        INSERT INTO user_skills (user_id, skill_id, proficiency_level, assessment_date)
                        VALUES (?, ?, ?, ?)
                    """, (user_id, skill_id, new_proficiency, datetime.datetime.now()))
        
        # Check if user has achieved their goals
        if goal_skills:
            # Get user's current skills
            cursor.execute("""
                SELECT skill_id, proficiency_level
                FROM user_skills
                WHERE user_id = ? AND skill_id IN ({})
            """.format(','.join(['?'] * len(goal_skills))), [user_id] + goal_skills)
            
            achieved_skills = {}
            for skill_id, proficiency in cursor.fetchall():
                achieved_skills[skill_id] = proficiency >= 0.7  # Skill is considered achieved if proficiency >= 70%
            
            # Check if all goals are achieved
            all_achieved = all(achieved_skills.get(skill_id, False) for skill_id in goal_skills)
            
            if all_achieved:
                # Mark path as completed
                cursor.execute("""
                    UPDATE learning_paths
                    SET completed_at = ?
                    WHERE id = ?
                """, (datetime.datetime.now(), path_id))
                
                # Suggest a new path for skills that build on the achieved ones
                next_level_skills = []
                for skill_id in goal_skills:
                    next_skills = self.skill_graph.get_dependent_skills(skill_id)
                    for next_skill in next_skills:
                        # Check if user already mastered this skill
                        cursor.execute("""
                            SELECT proficiency_level
                            FROM user_skills
                            WHERE user_id = ? AND skill_id = ?
                        """, (user_id, next_skill))
                        
                        result = cursor.fetchone()
                        if not result or result[0] < 0.7:
                            next_level_skills.append(next_skill)
                
                # Remove duplicates
                next_level_skills = list(set(next_level_skills))
                
                conn.commit()
                conn.close()
                
                if next_level_skills:
                    # Generate a suggested next path
                    suggested_path = self.generate_learning_path(
                        user_id=user_id,
                        target_skills=next_level_skills[:3],  # Take up to 3 skills
                        name="Next Level Path",
                        description="Suggested path to advance your skills"
                    )
                    
                    return {
                        'message': 'Learning path completed successfully',
                        'all_goals_achieved': True,
                        'suggested_next_path': suggested_path
                    }
                
                return {
                    'message': 'Learning path completed successfully',
                    'all_goals_achieved': True
                }
        
        conn.commit()
        conn.close()
        
        return {
            'message': 'Learning path updated successfully',
            'all_goals_achieved': False
        }

# API Routes
@app.route('/api/login', methods=['POST'])
def login():
    data = request.json
    username = data.get('username')
    password = data.get('password')
    
    if not username or not password:
        return jsonify({'error': 'Username and password required'}), 400
    
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    # Get user
    cursor.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,))
    user = cursor.fetchone()
    
    conn.close()
    
    if not user or user[1] != hashlib.sha256(password.encode()).hexdigest():
        return jsonify({'error': 'Invalid username or password'}), 401
    
    session['user_id'] = user[0]
    
    return jsonify({'message': 'Login successful', 'user_id': user[0]})

@app.route('/api/register', methods=['POST'])
def register():
    data = request.json
    username = data.get('username')
    email = data.get('email')
    password = data.get('password')
    
    if not username or not email or not password:
        return jsonify({'error': 'All fields required'}), 400
    
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    # Check if user exists
    cursor.execute("SELECT id FROM users WHERE username = ? OR email = ?", (username, email))
    if cursor.fetchone():
        conn.close()
        return jsonify({'error': 'Username or email already exists'}), 400
    
    # Create user
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    cursor.execute("""
        INSERT INTO users (username, email, password_hash)
        VALUES (?, ?, ?)
    """, (username, email, password_hash))
    
    user_id = cursor.lastrowid
    
    conn.commit()
    conn.close()
    
    session['user_id'] = user_id
    
    return jsonify({'message': 'Registration successful', 'user_id': user_id})

@app.route('/api/profile', methods=['GET', 'POST'])
def user_profile():
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    
    if request.method == 'GET':
        conn = sqlite3.connect('learning_path.db')
        
        # Get user info
        cursor = conn.cursor()
        cursor.execute("SELECT username, email FROM users WHERE id = ?", (user_id,))
        user_data = cursor.fetchone()
        
        if not user_data:
            conn.close()
            return jsonify({'error': 'User not found'}), 404
        
        # Get profile
        cursor.execute("""
            SELECT learning_style, interests, goals, background, preferences
            FROM user_profiles
            WHERE user_id = ?
        """, (user_id,))
        
        profile_data = cursor.fetchone()
        
        # Get skills
        cursor.execute("""
            SELECT us.skill_id, s.name, us.proficiency_level
            FROM user_skills us
            JOIN skills s ON us.skill_id = s.id
            WHERE us.user_id = ?
        """, (user_id,))
        
        skills_data = cursor.fetchall()
        
        conn.close()
        
        # Format response
        profile = {
            'username': user_data[0],
            'email': user_data[1],
            'profile': {
                'learning_style': profile_data[0] if profile_data else None,
                'interests': json.loads(profile_data[1]) if profile_data and profile_data[1] else [],
                'goals': json.loads(profile_data[2]) if profile_data and profile_data[2] else [],
                'background': profile_data[3] if profile_data else None,
                'preferences': json.loads(profile_data[4]) if profile_data and profile_data[4] else {}
            },
            'skills': [
                {'id': s[0], 'name': s[1], 'proficiency': s[2]}
                for s in skills_data
            ]
        }
        
        return jsonify(profile)
    
    elif request.method == 'POST':
        data = request.json
        
        conn = sqlite3.connect('learning_path.db')
        cursor = conn.cursor()
        
        # Check if profile exists
        cursor.execute("SELECT user_id FROM user_profiles WHERE user_id = ?", (user_id,))
        profile_exists = cursor.fetchone() is not None
        
        # Prepare profile data
        learning_style = data.get('learning_style')
        interests = json.dumps(data.get('interests', []))
        goals = json.dumps(data.get('goals', []))
        background = data.get('background')
        preferences = json.dumps(data.get('preferences', {}))
        
        # Update or insert profile
        if profile_exists:
            cursor.execute("""
                UPDATE user_profiles
                SET learning_style = ?, interests = ?, goals = ?, background = ?, preferences = ?, last_updated = ?
                WHERE user_id = ?
            """, (learning_style, interests, goals, background, preferences, datetime.datetime.now(), user_id))
        else:
            cursor.execute("""
                INSERT INTO user_profiles (user_id, learning_style, interests, goals, background, preferences, last_updated)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (user_id, learning_style, interests, goals, background, preferences, datetime.datetime.now()))
        
        # Handle skills update if provided
        if 'skills' in data:
            skills = data['skills']
            for skill in skills:
                skill_id = skill.get('id')
                proficiency = skill.get('proficiency', 0)
                
                cursor.execute("""
                    INSERT OR REPLACE INTO user_skills (user_id, skill_id, proficiency_level, assessment_date)
                    VALUES (?, ?, ?, ?)
                """, (user_id, skill_id, proficiency, datetime.datetime.now()))
        
        conn.commit()
        conn.close()
        
        return jsonify({'message': 'Profile updated successfully'})

@app.route('/api/skills', methods=['GET'])
def get_skills():
    conn = sqlite3.connect('learning_path.db')
    
    skills_df = pd.read_sql("""
        SELECT id, name, category, description, difficulty
        FROM skills
        ORDER BY category, difficulty, name
    """, conn)
    
    conn.close()
    
    # Group by category
    grouped_skills = {}
    for _, skill in skills_df.iterrows():
        category = skill['category']
        if category not in grouped_skills:
            grouped_skills[category] = []
        
        grouped_skills[category].append({
            'id': skill['id'],
            'name': skill['name'],
            'description': skill['description'],
            'difficulty': skill['difficulty']
        })
    
    return jsonify(grouped_skills)

@app.route('/api/skills/<int:skill_id>/prerequisites', methods=['GET'])
def get_skill_prerequisites(skill_id):
    # Create skill graph
    skill_graph = SkillGraph()
    
    # Get prerequisites
    prereqs = skill_graph.get_prerequisites(skill_id)
    
    # Get skill details
    conn = sqlite3.connect('learning_path.db')
    
    if prereqs:
        prereqs_str = ','.join('?' * len(prereqs))
        prereqs_df = pd.read_sql(f"""
            SELECT id, name, category, difficulty
            FROM skills
            WHERE id IN ({prereqs_str})
        """, conn, params=prereqs)
        
        prereqs_data = prereqs_df.to_dict('records')
    else:
        prereqs_data = []
    
    # Get skill information
    skill_df = pd.read_sql("""
        SELECT id, name, category, description, difficulty
        FROM skills
        WHERE id = ?
    """, conn, params=(skill_id,))
    
    conn.close()
    
    if len(skill_df) == 0:
        return jsonify({'error': 'Skill not found'}), 404
    
    skill_data = skill_df.iloc[0].to_dict()
    skill_data['prerequisites'] = prereqs_data
    
    return jsonify(skill_data)

@app.route('/api/learning-paths', methods=['GET', 'POST'])
def learning_paths():
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    
    if request.method == 'GET':
        conn = sqlite3.connect('learning_path.db')
        
        paths_df = pd.read_sql(f"""
            SELECT id, name, description, goal_skills, created_at, updated_at
            FROM learning_paths
            WHERE user_id = {user_id}
            ORDER BY updated_at DESC
        """, conn)
        
        # Get completion stats for each path
        paths_data = []
        for _, path in paths_df.iterrows():
            path_id = path['id']
            
            # Get steps count and completion
            steps_df = pd.read_sql(f"""
                SELECT status, COUNT(*) as count
                FROM path_steps
                WHERE path_id = {path_id}
                GROUP BY status
            """, conn)
            
            total_steps = steps_df['count'].sum()
            completed_steps = steps_df[steps_df['status'] == 'completed']['count'].sum() if 'completed' in steps_df['status'].values else 0
            
            # Parse goal skills
            goal_skills = []
            if path['goal_skills']:
                try:
                    skill_ids = json.loads(path['goal_skills'])
                    
                    # Get skill names
                    if skill_ids:
                        skills_str = ','.join('?' * len(skill_ids))
                        skills_df = pd.read_sql(f"""
                            SELECT id, name
                            FROM skills
                            WHERE id IN ({skills_str})
                        """, conn, params=skill_ids)
                        
                        goal_skills = skills_df.to_dict('records')
                except:
                    goal_skills = []
            
            paths_data.append({
                'id': path['id'],
                'name': path['name'],
                'description': path['description'],
                'goal_skills': goal_skills,
                'created_at': path['created_at'],
                'updated_at': path['updated_at'],
                'total_steps': total_steps,
                'completed_steps': completed_steps,
                'progress': (completed_steps / total_steps * 100) if total_steps > 0 else 0
            })
        
        conn.close()
        
        return jsonify(paths_data)
    
    elif request.method == 'POST':
        data = request.json
        target_skills = data.get('target_skills', [])
        name = data.get('name')
        description = data.get('description')
        
        if not target_skills:
            return jsonify({'error': 'Target skills required'}), 400
        
        # Generate learning path
        generator = LearningPathGenerator()
        path = generator.generate_learning_path(user_id, target_skills, name, description)
        
        return jsonify(path)

@app.route('/api/learning-paths/<int:path_id>', methods=['GET', 'PUT'])
def learning_path_detail(path_id):
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    
    conn = sqlite3.connect('learning_path.db')
    
    # Check if path exists and belongs to user
    cursor = conn.cursor()
    cursor.execute("""
        SELECT id
        FROM learning_paths
        WHERE id = ? AND user_id = ?
    """, (path_id, user_id))
    
    path_exists = cursor.fetchone() is not None
    
    if not path_exists:
        conn.close()
        return jsonify({'error': 'Learning path not found'}), 404
    
    if request.method == 'GET':
        # Get path details
        path_df = pd.read_sql(f"""
            SELECT id, name, description, goal_skills, created_at, updated_at
            FROM learning_paths
            WHERE id = {path_id}
        """, conn)
        
        # Get steps
        steps_df = pd.read_sql(f"""
            SELECT ps.id, ps.step_number, ps.resource_id, ps.status, ps.completed_at,
                   r.title, r.description, r.resource_type, r.difficulty, r.duration, r.url, r.provider,
                   rs.skill_id
            FROM path_steps ps
            JOIN resources r ON ps.resource_id = r.id
            LEFT JOIN resource_skills rs ON r.id = rs.resource_id AND rs.relationship_type = 'teaches'
            WHERE ps.path_id = {path_id}
            ORDER BY ps.step_number
        """, conn)
        
        # Get skill information for goal skills and step skills
        goal_skills = json.loads(path_df.iloc[0]['goal_skills']) if path_df.iloc[0]['goal_skills'] else []
        step_skills = steps_df['skill_id'].dropna().unique().tolist()
        
        all_skills = list(set(goal_skills + step_skills))
        
        if all_skills:
            skills_str = ','.join('?' * len(all_skills))
            skills_df = pd.read_sql(f"""
                SELECT id, name, category
                FROM skills
                WHERE id IN ({skills_str})
            """, conn, params=all_skills)
            
            # Create skill lookup
            skill_lookup = {row['id']: row.to_dict() for _, row in skills_df.iterrows()}
        else:
            skill_lookup = {}
        
        # Format steps with skill info
        steps_data = []
        for _, step in steps_df.iterrows():
            skill_id = step['skill_id']
            skill_info = skill_lookup.get(skill_id, {}) if skill_id else {}
            
            steps_data.append({
                'id': step['id'],
                'step_number': step['step_number'],
                'resource': {
                    'id': step['resource_id'],
                    'title': step['title'],
                    'description': step['description'],
                    'type': step['resource_type'],
                    'difficulty': step['difficulty'],
                    'duration': step['duration'],
                    'url': step['url'],
                    'provider': step['provider']
                },
                'skill': skill_info,
                'status': step['status'],
                'completed_at': step['completed_at']
            })
        
        # Format goal skills
        goal_skills_data = [skill_lookup.get(skill_id, {'id': skill_id}) for skill_id in goal_skills]
        
        path_data = {
            'id': path_df.iloc[0]['id'],
            'name': path_df.iloc[0]['name'],
            'description': path_df.iloc[0]['description'],
            'goal_skills': goal_skills_data,
            'created_at': path_df.iloc[0]['created_at'],
            'updated_at': path_df.iloc[0]['updated_at'],
            'steps': steps_data
        }
        
        conn.close()
        
        return jsonify(path_data)
    
    elif request.method == 'PUT':
        data = request.json
        
        # Update learning path
        generator = LearningPathGenerator()
        result = generator.update_learning_path(path_id, data)
        
        return jsonify(result)

@app.route('/api/resources/<int:resource_id>/rate', methods=['POST'])
def rate_resource(resource_id):
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    data = request.json
    
    rating = data.get('rating')
    review = data.get('review', '')
    
    if not rating or not isinstance(rating, (int, float)) or rating < 1 or rating > 5:
        return jsonify({'error': 'Invalid rating'}), 400
    
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    # Check if resource exists
    cursor.execute("SELECT id FROM resources WHERE id = ?", (resource_id,))
    if not cursor.fetchone():
        conn.close()
        return jsonify({'error': 'Resource not found'}), 404
    
    # Add or update rating
    cursor.execute("""
        INSERT OR REPLACE INTO resource_ratings (user_id, resource_id, rating, review, timestamp)
        VALUES (?, ?, ?, ?, ?)
    """, (user_id, resource_id, rating, review, datetime.datetime.now()))
    
    # Update average rating in resources table
    cursor.execute("""
        UPDATE resources
        SET rating = (
            SELECT AVG(rating)
            FROM resource_ratings
            WHERE resource_id = ?
        )
        WHERE id = ?
    """, (resource_id, resource_id))
    
    conn.commit()
    conn.close()
    
    return jsonify({'message': 'Rating submitted successfully'})

@app.route('/api/learning-groups/join/<int:group_id>', methods=['POST'])
def join_group(group_id):
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    # Check if group exists
    cursor.execute("SELECT id FROM learning_groups WHERE id = ?", (group_id,))
    if not cursor.fetchone():
        conn.close()
        return jsonify({'error': 'Group not found'}), 404
    
    # Check if user is already a member
    cursor.execute("""
        SELECT group_id
        FROM group_members
        WHERE group_id = ? AND user_id = ?
    """, (group_id, user_id))
    
    if cursor.fetchone():
        conn.close()
        return jsonify({'error': 'Already a member of this group'}), 400
    
    # Add user to group
    cursor.execute("""
        INSERT INTO group_members (group_id, user_id)
        VALUES (?, ?)
    """, (group_id, user_id))
    
    conn.commit()
    conn.close()
    
    return jsonify({'message': 'Successfully joined the group'})

@app.route('/api/skill-assessment', methods=['POST'])
def assess_skills():
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    data = request.json
    
    if not data or not isinstance(data, list):
        return jsonify({'error': 'Invalid data format'}), 400
    
    conn = sqlite3.connect('learning_path.db')
    cursor = conn.cursor()
    
    for assessment in data:
        skill_id = assessment.get('skill_id')
        proficiency = assessment.get('proficiency')
        
        if not skill_id or not isinstance(proficiency, (int, float)):
            continue
        
        # Normalize proficiency to 0-1 range
        proficiency = max(0, min(1, proficiency))
        
        # Update or insert skill assessment
        cursor.execute("""
            INSERT OR REPLACE INTO user_skills (user_id, skill_id, proficiency_level, assessment_date)
            VALUES (?, ?, ?, ?)
        """, (user_id, skill_id, proficiency, datetime.datetime.now()))
    
    conn.commit()
    conn.close()
    
    return jsonify({'message': 'Skill assessment saved successfully'})

@app.route('/api/recommendations/skills', methods=['GET'])
def recommend_skills():
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    
    # Get user's current skills
    conn = sqlite3.connect('learning_path.db')
    
    user_skills_df = pd.read_sql(f"""
        SELECT skill_id, proficiency_level
        FROM user_skills
        WHERE user_id = {user_id}
    """, conn)
    
    # If user has no skills yet, recommend beginner skills
    if len(user_skills_df) == 0:
        beginner_skills = pd.read_sql("""
            SELECT id, name, category, description
            FROM skills
            WHERE difficulty = 1
            LIMIT 5
        """, conn)
        
        conn.close()
        return jsonify({
            'beginner_recommendations': beginner_skills.to_dict('records'),
            'next_level_recommendations': [],
            'based_on_interests': []
        })
    
    # Create skill graph
    skill_graph = SkillGraph()
    
    # Find mastered skills (proficiency > 0.7)
    mastered_skills = user_skills_df[user_skills_df['proficiency_level'] > 0.7]['skill_id'].tolist()
    
    # Get next level skills based on mastered skills
    next_level_skills = []
    for skill_id in mastered_skills:
        dependents = skill_graph.get_dependent_skills(skill_id)
        next_level_skills.extend(dependents)
    
    # Remove duplicates and already mastered skills
    next_level_skills = list(set(next_level_skills) - set(mastered_skills))
    
    # Get user's learning profile and interests
    user_profile = pd.read_sql(f"""
        SELECT interests
        FROM user_profiles
        WHERE user_id = {user_id}
    """, conn)
    
    interest_recommendations = []
    if len(user_profile) > 0 and user_profile.iloc[0]['interests']:
        try:
            interests = json.loads(user_profile.iloc[0]['interests'])
            
            # Query skills related to interests
            skills_df = pd.read_sql("""
                SELECT id, name, category, description
                FROM skills
            """, conn)
            
            # Simple matching based on keywords in skill names and descriptions
            if interests and isinstance(interests, list):
                for interest in interests:
                    interest_lower = interest.lower()
                    for _, skill in skills_df.iterrows():
                        name_match = interest_lower in skill['name'].lower()
                        desc_match = skill['description'] and interest_lower in skill['description'].lower()
                        category_match = skill['category'] and interest_lower in skill['category'].lower()
                        
                        if (name_match or desc_match or category_match) and skill['id'] not in mastered_skills:
                            interest_recommendations.append(skill['id'])
        except:
            pass
    
    # Get skill details
    if next_level_skills:
        next_skills_str = ','.join('?' * len(next_level_skills))
        next_level_df = pd.read_sql(f"""
            SELECT id, name, category, description
            FROM skills
            WHERE id IN ({next_skills_str})
        """, conn, params=next_level_skills)
        
        next_level_data = next_level_df.to_dict('records')
    else:
        next_level_data = []
    
    if interest_recommendations:
        interest_skills_str = ','.join('?' * len(interest_recommendations))
        interest_df = pd.read_sql(f"""
            SELECT id, name, category, description
            FROM skills
            WHERE id IN ({interest_skills_str})
        """, conn, params=interest_recommendations)
        
        interest_data = interest_df.to_dict('records')
    else:
        interest_data = []
    
    # Get some beginner-friendly skills that user hasn't mastered yet
    all_recommended = set(next_level_skills + interest_recommendations + mastered_skills)
    beginner_df = pd.read_sql(f"""
        SELECT id, name, category, description
        FROM skills
        WHERE difficulty = 1 AND id NOT IN ({','.join('?' * len(all_recommended))})
        LIMIT 3
    """, conn, params=list(all_recommended)) if all_recommended else pd.read_sql("""
        SELECT id, name, category, description
        FROM skills
        WHERE difficulty = 1
        LIMIT 3
    """, conn)
    
    conn.close()
    
    return jsonify({
        'beginner_recommendations': beginner_df.to_dict('records'),
        'next_level_recommendations': next_level_data[:5],  # Limit to 5
        'based_on_interests': interest_data[:5]  # Limit to 5
    })

@app.route('/api/recommendations/learning-groups', methods=['GET'])
def recommend_groups():
    if 'user_id' not in session:
        return jsonify({'error': 'Not logged in'}), 401
    
    user_id = session['user_id']
    
    # Get skill_id from query params (optional)
    skill_id = request.args.get('skill_id', type=int)
    
    # Get group recommendations
    generator = LearningPathGenerator()
    groups = generator.recommend_learning_groups(user_id, skill_id)
    
    return jsonify({'recommended_groups': groups})

if __name__ == '__main__':
    app.run(debug=True)

Resources:

10. Environmental Monitoring System with IoT Integration

A futuristic Environmental Monitoring System with IoT Integration. The scene features smart sensors placed in nature, collecting real-time data on air quality, water pollution, and climate conditions. A digital dashboard displays live environmental metrics with AI-driven analysis and predictive insights. Scientists and researchers monitor the data remotely through a holographic interface. The background showcases a sustainable landscape with green energy solutions, including solar panels and wind turbines.

Difficulty Level: Intermediate to Advanced

Skills Developed: IoT, Sensor Programming, Data Analysis, Cloud Integration, Visualization

Project Description: Create an environmental monitoring system that collects data from distributed sensors (temperature, humidity, air quality, noise levels) across a college campus. This system analyzes patterns, identifies anomalies, and provides insights to improve campus sustainability and student well-being.

Key Features:

  • Sensor data collection and storage
  • Real-time monitoring dashboard
  • Anomaly detection algorithms
  • Environmental trend analysis
  • Notification system for threshold violations
  • API integration for weather data correlation

Implementation Steps:

  1. Set up sensors (or simulate sensor data)
  2. Create a data collection backend with Flask or Django
  3. Implement a database for storing sensor readings
  4. Develop analysis algorithms for pattern detection
  5. Build visualization dashboards using Plotly Dash
  6. Implement notification systems using webhooks or SMS

Sample Code Implementation:

pythonCopy# environmental_monitoring.py
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
import sqlite3
import datetime
import time
import json
import os
import requests
import smtplib
from email.message import EmailMessage
import threading
import logging
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import random

# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("environment_monitor.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)

# Environment data model
class EnvironmentDatabase:
def __init__(self, db_path='environment_data.db'):
self.db_path = db_path
self.init_db()

def init_db(self):
"""Initialize the SQLite database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

# Create sensor locations table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_locations (
id INTEGER PRIMARY KEY,
location_name TEXT NOT NULL,
building TEXT,
floor INTEGER,
room TEXT,
latitude REAL,
longitude REAL,
indoor BOOLEAN,
notes TEXT
)
''')

# Create sensors table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensors (
id INTEGER PRIMARY KEY,
sensor_type TEXT NOT NULL,
model TEXT,
manufacturer TEXT,
install_date DATE,
last_maintenance DATE,
location_id INTEGER,
status TEXT DEFAULT 'active',
FOREIGN KEY (location_id) REFERENCES sensor_locations (id)
)
''')

# Create sensor readings table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY,
sensor_id INTEGER,
timestamp DATETIME NOT NULL,
temperature REAL,
humidity REAL,
air_quality_index REAL,
co2_level REAL,
noise_level REAL,
light_level REAL,
pressure REAL,
particulate_matter REAL,
voc_level REAL,
battery_level REAL,
FOREIGN KEY (sensor_id) REFERENCES sensors (id)
)
''')

# Create alerts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY,
sensor_id INTEGER,
timestamp DATETIME NOT NULL,
alert_type TEXT NOT NULL,
measurement_type TEXT NOT NULL,
measurement_value REAL,
threshold_value REAL,
message TEXT,
acknowledged BOOLEAN DEFAULT 0,
FOREIGN KEY (sensor_id) REFERENCES sensors (id)
)
''')

# Create thresholds table
cursor.execute('''
CREATE TABLE IF NOT EXISTS thresholds (
id INTEGER PRIMARY KEY,
measurement_type TEXT NOT NULL,
min_value REAL,
max_value REAL,
location_id INTEGER,
sensor_id INTEGER,
FOREIGN KEY (location_id) REFERENCES sensor_locations (id),
FOREIGN KEY (sensor_id) REFERENCES sensors (id)
)
''')

# Create external weather data table
cursor.execute('''
CREATE TABLE IF NOT EXISTS external_weather (
id INTEGER PRIMARY KEY,
timestamp DATETIME NOT NULL,
temperature REAL,
humidity REAL,
pressure REAL,
wind_speed REAL,
wind_direction TEXT,
precipitation REAL,
weather_condition TEXT,
source TEXT
)
''')

conn.commit()
conn.close()

self.load_sample_data()

def load_sample_data(self):
"""Load sample data if the database is empty."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

# Check if we already have data
cursor.execute("SELECT COUNT(*) FROM sensor_locations")
if cursor.fetchone()[0] > 0:
conn.close()
return

# Sample sensor locations
locations = [
(1, 'Science Building Lobby', 'Science Building', 1, 'Lobby', 40.7128, -74.0060, True, 'Main entrance area'),
(2, 'Science Building Lab 101', 'Science Building', 1, '101', 40.7128, -74.0061, True, 'Chemistry lab'),
(3, 'Science Building Roof', 'Science Building', 4, 'Roof', 40.7129, -74.0060, False, 'Outdoor sensor'),
(4, 'Library Main Hall', 'Library', 1, 'Main Hall', 40.7135, -74.0070, True, 'Reading area'),
(5, 'Student Center Cafeteria', 'Student Center', 1, 'Cafeteria', 40.7140, -74.0075, True, 'Dining area'),
(6, 'Engineering Building Room 201', 'Engineering Building', 2, '201', 40.7145, -74.0080, True, 'Computer lab'),
(7, 'Dormitory A Common Room', 'Dormitory A', 1, 'Common Room', 40.7150, -74.0090, True, 'Student lounge'),
(8, 'Sports Field', 'Outdoor', 0, 'Field', 40.7155, -74.0095, False, 'Soccer field')
]

cursor.executemany('''
INSERT INTO sensor_locations (id, location_name, building, floor, room, latitude, longitude, indoor, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', locations)

# Sample sensors
sensors = [
(1, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-01-15', '2023-06-15', 1, 'active'),
(2, 'air_quality', 'PMS5003', 'Plantower', '2023-01-15', '2023-06-15', 1, 'active'),
(3, 'noise', 'SEN-12642', 'SparkFun', '2023-01-15', '2023-06-15', 1, 'active'),
(4, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-01-20', '2023-06-20', 2, 'active'),
(5, 'air_quality', 'PMS5003', 'Plantower', '2023-01-20', '2023-06-20', 2, 'active'),
(6, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-01-25', '2023-06-25', 3, 'active'),
(7, 'air_quality', 'PMS5003', 'Plantower', '2023-01-25', '2023-06-25', 3, 'active'),
(8, 'noise', 'SEN-12642', 'SparkFun', '2023-01-25', '2023-06-25', 3, 'active'),
(9, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-01', '2023-07-01', 4, 'active'),
(10, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-05', '2023-07-05', 5, 'active'),
(11, 'air_quality', 'PMS5003', 'Plantower', '2023-02-05', '2023-07-05', 5, 'active'),
(12, 'noise', 'SEN-12642', 'SparkFun', '2023-02-05', '2023-07-05', 5, 'active'),
(13, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-10', '2023-07-10', 6, 'active'),
(14, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-15', '2023-07-15', 7, 'active'),
(15, 'temperature_humidity', 'DHT22', 'Adafruit', '2023-02-20', '2023-07-20', 8, 'active'),
(16, 'air_quality', 'PMS5003', 'Plantower', '2023-02-20', '2023-07-20', 8, 'active')
]

cursor.executemany('''
INSERT INTO sensors (id, sensor_type, model, manufacturer, install_date, last_maintenance, location_id, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', sensors)

# Sample thresholds
thresholds = [
(1, 'temperature', 18.0, 28.0, None, None), # Global temperature thresholds
(2, 'humidity', 30.0, 60.0, None, None), # Global humidity thresholds
(3, 'air_quality_index', 0.0, 100.0, None, None), # Global AQI thresholds
(4, 'co2_level', 400.0, 1000.0, None, None), # Global CO2 thresholds
(5, 'noise_level', 40.0, 70.0, None, None), # Global noise thresholds
(6, 'temperature', 20.0, 24.0, 2, None), # Lab-specific temperature thresholds
(7, 'temperature', 22.0, 26.0, 5, None), # Cafeteria-specific temperature thresholds
(8, 'noise_level', 40.0, 60.0, 4, None), # Library-specific noise thresholds
(9, 'air_quality_index', 0.0, 50.0, 6, None) # Computer lab-specific AQI thresholds
]

cursor.executemany('''
INSERT INTO thresholds (id, measurement_type, min_value, max_value, location_id, sensor_id)
VALUES (?, ?, ?, ?, ?, ?)
''', thresholds)

conn.commit()
conn.close()

# Generate sample readings for the past 7 days
self.generate_sample_readings(days=7)

def generate_sample_readings(self, days=7):
"""Generate realistic sample sensor readings for testing."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

# Get all sensors
cursor.execute("SELECT id, sensor_type, location_id FROM sensors")
sensors = cursor.fetchall()

# Current time
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=days)

# Sample at 15-minute intervals
interval_minutes = 15

# Generate readings for each sensor
readings = []
current_time = start_time

while current_time <= end_time:
for sensor_id, sensor_type, location_id in sensors:
# Base values with some daily patterns
hour_of_day = current_time.hour

# Temperature varies throughout the day
temp_base = 22.0 # Base indoor temperature
if location_id in [3, 8]: # Outdoor sensors
# More variation for outdoor sensors
temp_base = 15.0 + 10.0 * np.sin(np.pi * (hour_of_day - 6) / 12) # Peak at noon
else:
# Indoor follows outdoor but with less variation and some lag
temp_base = 21.0 + 2.0 * np.sin(np.pi * (hour_of_day - 8) / 12)

# Add some day-to-day variation
day_offset = (current_time.date() - start_time.date()).days
temp_base += np.sin(day_offset * 0.5) * 3 # Slow weekly pattern

# Add noise
temperature = temp_base + np.random.normal(0, 0.5)

# Humidity inversely related to temperature with some randomness
humidity_base = 80.0 - temperature * 1.5
humidity = max(30, min(95, humidity_base + np.random.normal(0, 5)))

# Air quality worse during busy hours
aqi_base = 50.0
if 8 <= hour_of_day <= 18: # Work hours
aqi_base = 60.0 + (10.0 * np.sin(np.pi * (hour_of_day - 8) / 10))
air_quality_index = max(20, min(150, aqi_base + np.random.normal(0, 8)))

# CO2 follows occupancy patterns
co2_base = 400.0 # Baseline outdoor level
if location_id not in [3, 8]: # Indoor
if 8 <= hour_of_day <= 18: # Work hours
co2_base = 600.0 + (200.0 * np.sin(np.pi * (hour_of_day - 8) / 10))
co2_level = max(400, min(1500, co2_base + np.random.normal(0, 30)))

# Noise level based on location and time
noise_base = 45.0
if location_id == 4: # Library
noise_base = 35.0
elif location_id == 5: # Cafeteria
if 11 <= hour_of_day <= 14 or 17 <= hour_of_day <= 19: # Meal times
noise_base = 65.0
else:
noise_base = 45.0
elif location_id in [3, 8]: # Outdoor
noise_base = 50.0 + (10.0 * np.sin(np.pi * (hour_of_day - 8) / 10))
noise_level = max(30, min(80, noise_base + np.random.normal(0, 5)))

# Light level based on time
light_base = 0.0
if 6 <= hour_of_day <= 18: # Daylight
light_base = 300.0 + (300.0 * np.sin(np.pi * (hour_of_day - 6) / 12))
light_level = max(0, min(800, light_base + np.random.normal(0, 30)))

# Pressure (relatively stable with weather patterns)
pressure_base = 1013.0 # Standard pressure
pressure = pressure_base + day_offset * 0.5 + np.random.normal(0, 1)

# PM2.5 levels correlate with air quality
pm_base = air_quality_index * 0.5
particulate_matter = max(5, min(300, pm_base + np.random.normal(0, 10)))

# VOC levels
voc_base = 400.0
if location_id == 2: # Chemistry lab
voc_base = 600.0
voc_level = max(100, min(1000, voc_base + np.random.normal(0, 50)))

# Battery level (slowly decreases over time, reset at maintenance)
days_since_maintenance = (current_time.date() - datetime.datetime.strptime(
cursor.execute("SELECT last_maintenance FROM sensors WHERE id = ?", (sensor_id,)).fetchone()[0],
"%Y-%m-%d"
).date()).days
battery_level = 100.0 - (days_since_maintenance * 0.5) + np.random.normal(0, 0.1)
battery_level = max(0, min(100, battery_level))

# Initialize with None, then set values based on sensor type
reading = {
'sensor_id': sensor_id,
'timestamp': current_time.isoformat(),
'temperature': None,
'humidity': None,
'air_quality_index': None,
'co2_level': None,
'noise_level': None,
'light_level': None,
'pressure': None,
'particulate_matter': None,
'voc_level': None,
'battery_level': battery_level
}

# Set values based on sensor type
if sensor_type == 'temperature_humidity':
reading['temperature'] = temperature
reading['humidity'] = humidity
reading['pressure'] = pressure
elif sensor_type == 'air_quality':
reading['air_quality_index'] = air_quality_index
reading['co2_level'] = co2_level
reading['particulate_matter'] = particulate_matter
reading['voc_level'] = voc_level
elif sensor_type == 'noise':
reading['noise_level'] = noise_level
reading['light_level'] = light_level

readings.append((
sensor_id,
current_time.isoformat(),
reading['temperature'],
reading['humidity'],
reading['air_quality_index'],
reading['co2_level'],
reading['noise_level'],
reading['light_level'],
reading['pressure'],
reading['particulate_matter'],
reading['voc_level'],
reading['battery_level']
))

# Increment time
current_time += datetime.timedelta(minutes=interval_minutes)

# Insert in batches to avoid SQLite limitations
batch_size = 1000
for i in range(0, len(readings), batch_size):
batch = readings[i:i+batch_size]
cursor.executemany('''
INSERT INTO sensor_readings (
sensor_id, timestamp, temperature, humidity, air_quality_index,
co2_level, noise_level, light_level, pressure, particulate_matter,
voc_level, battery_level
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', batch)
conn.commit()

# Generate some external weather data
weather_data = []
current_time = start_time

while current_time <= end_time:
# Only store at hourly intervals
if current_time.minute == 0:
# Base temperature varies by time of day
hour_of_day = current_time.hour
temp_base = 15.0 + 10.0 * np.sin(np.pi * (hour_of_day - 6) / 12) # Peak at noon

# Add some day-to-day variation
day_offset = (current_time.date() - start_time.date()).days
temp_base += np.sin(day_offset * 0.5) * 3 # Slow weekly pattern

temperature = temp_base + np.random.normal(0, 0.5)

# Humidity inversely related to temperature
humidity = 80.0 - temperature * 1.5 + np.random.normal(0, 5)
humidity = max(30, min(95, humidity))

# Pressure varies slowly with some randomness
pressure = 1013.0 + day_offset * 0.5 + np.random.normal(0, 1)

# Wind speed and direction
wind_speed = max(0, np.random.normal(5, 2))
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
wind_direction = random.choice(directions)

# Precipitation (mostly zero with occasional rain)
precipitation = 0.0
if random.random() < 0.2: # 20% chance of rain
precipitation = random.uniform(0, 10)

# Weather condition
if precipitation > 1.0:
weather_condition = 'rain'
elif humidity > 80:
weather_condition = 'fog'
elif temperature > 25:
weather_condition = 'sunny'
else:
weather_condition = 'cloudy'

weather_data.append((
current_time.isoformat(),
temperature,
humidity,
pressure,
wind_speed,
wind_direction,
precipitation,
weather_condition,
'simulated'
))

current_time += datetime.timedelta(minutes=15)

cursor.executemany('''
INSERT INTO external_weather (
timestamp, temperature, humidity, pressure, wind_speed,
wind_direction, precipitation, weather_condition, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', weather_data)

conn.commit()
conn.close()

def get_sensor_info(self):
"""Get information about all sensors."""
conn = sqlite3.connect(self.db_path)

query = '''
SELECT s.id, s.sensor_type, s.model, s.manufacturer, s.status,
l.id as location_id, l.location_name, l.building, l.floor, l.room, l.indoor
FROM sensors s
JOIN sensor_locations l ON s.location_id = l.id
'''

df = pd.read_sql_query(query, conn)
conn.close()

return df

def get_latest_readings(self):
"""Get the latest reading from each sensor."""
conn = sqlite3.connect(self.db_path)

query = '''
WITH LatestReadings AS (
SELECT sensor_id, MAX(timestamp) as max_time
FROM sensor_readings
GROUP BY sensor_id
)
SELECT sr.*, s.sensor_type, l.location_name, l.building
FROM sensor_readings sr
JOIN LatestReadings lr ON sr.sensor_id = lr.sensor_id AND sr.timestamp = lr.max_time
JOIN sensors s ON sr.sensor_id = s.id
JOIN sensor_locations l ON s.location_id = l.id
'''

df = pd.read_sql_query(query, conn)
conn.close()

return df

def get_readings_for_period(self, start_time, end_time, sensor_ids=None, measurement_types=None):
"""Get sensor readings for a specific time period."""
conn = sqlite3.connect(self.db_path)

params = [start_time, end_time]

query = '''
SELECT sr.*, s.sensor_type, l.location_name, l.building
FROM sensor_readings sr
JOIN sensors s ON sr.sensor_id = s.id
JOIN sensor_locations l ON s.location_id = l.id
WHERE sr.timestamp >= ? AND sr.timestamp <= ?
'''

if sensor_ids:
placeholders = ','.join('?' * len(sensor_ids))
query += f' AND sr.sensor_id IN ({placeholders})'
params.extend(sensor_ids)

query += ' ORDER BY sr.timestamp'

df = pd.read_sql_query(query, conn, params=params)
conn.close()

# Filter by measurement types if specified
if measurement_types and len(df) > 0:
df = df[['timestamp', 'sensor_id', 'sensor_type', 'location_name', 'building'] + measurement_types]

return df

def get_thresholds(self, location_id=None, sensor_id=None):
"""Get threshold settings."""
conn = sqlite3.connect(self.db_path)

query = 'SELECT * FROM thresholds WHERE 1=1'
params = []

if location_id:
query += ' AND (location_id = ? OR location_id IS NULL)'
params.append(location_id)

if sensor_id:
query += ' AND (sensor_id = ? OR sensor_id IS NULL)'
params.append(sensor_id)

df = pd.read_sql_query(query, conn, params=params)
conn.close()

return df

def add_sensor_reading(self, sensor_id, data):
"""Add a new sensor reading to the database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

# Check that sensor exists
cursor.execute("SELECT id FROM sensors WHERE id = ?", (sensor_id,))
if not cursor.fetchone():
conn.close()
return False

# Prepare data
timestamp = data.get('timestamp', datetime.datetime.now().isoformat())

cursor.execute('''
INSERT INTO sensor_readings (
sensor_id, timestamp, temperature, humidity, air_quality_index,
co2_level, noise_level, light_level, pressure, particulate_matter,
voc_level, battery_level
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
sensor_id,
timestamp,
data.get('temperature'),
data.get('humidity'),
data.get('air_quality_index'),
data.get('co2_level'),
data.get('noise_level'),
data.get('light_level'),
data.get('pressure'),
data.get('particulate_matter'),
data.get('voc_level'),
data.get('battery_level')
))

conn.commit()

# Check against thresholds
self.check_thresholds(conn, sensor_id, data)

conn.close()
return True

def check_thresholds(self, conn, sensor_id, data):
"""Check if readings exceed thresholds and create alerts."""
cursor = conn.cursor()

# Get sensor location
cursor.execute("SELECT location_id FROM sensors WHERE id = ?", (sensor_id,))
location_id = cursor.fetchone()[0]

# Get applicable thresholds
cursor.execute('''
SELECT measurement_type, min_value, max_value
FROM thresholds
WHERE (location_id = ? OR location_id IS NULL)
AND (sensor_id = ? OR sensor_id IS NULL)
''', (location_id, sensor_id))

thresholds = cursor.fetchall()

# Check each measurement against thresholds
timestamp = data.get('timestamp', datetime.datetime.now().isoformat())

for measurement_type, min_value, max_value in thresholds:
value = data.get(measurement_type)

if value is None:
continue

if min_value is not None and value < min_value:
# Create low threshold alert
cursor.execute('''
INSERT INTO alerts (
sensor_id, timestamp, alert_type, measurement_type,
measurement_value, threshold_value, message
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
sensor_id,
timestamp,
'low',
measurement_type,
value,
min_value,
f"{measurement_type} reading ({value}) below minimum threshold ({min_value})"
))

if max_value is not None and value > max_value:
# Create high threshold alert
cursor.execute('''
INSERT INTO alerts (
sensor_id, timestamp, alert_type, measurement_type,
measurement_value, threshold_value, message
) VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
sensor_id,
timestamp,
'high',
measurement_type,
value,
max_value,
f"{measurement_type} reading ({value}) above maximum threshold ({max_value})"
))

conn.commit()

def get_alerts(self, start_time=None, end_time=None, acknowledged=None):
"""Get alerts with optional filtering."""
conn = sqlite3.connect(self.db_path)

query = '''
SELECT a.*, s.sensor_type, l.location_name, l.building
FROM alerts a
JOIN sensors s ON a.sensor_id = s.id
JOIN sensor_locations l ON s.location_id = l.id
WHERE 1=1
'''

params = []

if start_time:
query += ' AND a.timestamp >= ?'
params.append(start_time)

if end_time:
query += ' AND a.timestamp <= ?'
params.append(end_time)

if acknowledged is not None:
query += ' AND a.acknowledged = ?'
params.append(1 if acknowledged else 0)

query += ' ORDER BY a.timestamp DESC'

df = pd.read_sql_query(query, conn, params=params)
conn.close()

return df

def acknowledge_alert(self, alert_id):
"""Mark an alert as acknowledged."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

cursor.execute("UPDATE alerts SET acknowledged = 1 WHERE id = ?", (alert_id,))

rows_affected = cursor.rowcount
conn.commit()
conn.close()

return rows_affected > 0

def get_external_weather(self, start_time, end_time):
"""Get external weather data for a specific time period."""
conn = sqlite3.connect(self.db_path)

query = '''
SELECT *
FROM external_weather
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp
'''

df = pd.read_sql_query(query, conn, params=[start_time, end_time])
conn.close()

return df

class DataAnalyzer:
"""Analyze environmental data for patterns and anomalies."""

def __init__(self, db):
self.db = db

def detect_anomalies(self, data, columns, contamination=0.05):
"""Detect anomalies in sensor readings using Isolation Forest."""
if len(data) < 10:
return {}

# Prepare data for anomaly detection
features = data[columns].copy()

# Handle missing values
features = features.fillna(method='ffill').fillna(method='bfill')

if features.isna().any().any():
# If still have NaNs, drop those columns
features = features.dropna(axis=1)

if len(features.columns) == 0 or len(features) < 10:
return {}

# Standardize features
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)

# Train isolation forest
clf = IsolationForest(contamination=contamination, random_state=42)
predictions = clf.fit_predict(scaled_features)

# Anomaly scores
scores = clf.decision_function(scaled_features)

# Identify anomalies
anomalies = {}
for i, pred in enumerate(predictions):
if pred == -1: # Anomaly
timestamp = data.iloc[i]['timestamp']
anomaly_data = {
'score': scores[i],
'values': {col: data.iloc[i][col] for col in columns if not pd.isna(data.iloc[i][col])}
}
anomalies[timestamp] = anomaly_data

return anomalies

def identify_patterns(self, data, column, period='daily'):
"""Identify patterns in sensor data (e.g., daily patterns)."""
if len(data) < 24:
return None

# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_any_dtype(data['timestamp']):
data = data.copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])

# Extract time components
if period == 'daily':
data['hour'] = data['timestamp'].dt.hour
groupby_col = 'hour'
elif period == 'weekly':
data['day_of_week'] = data['timestamp'].dt.dayofweek
groupby_col = 'day_of_week'
elif period == 'monthly':
data['day'] = data['timestamp'].dt.day
groupby_col = 'day'
else:
return None

# Group by time component and calculate statistics
pattern = data.groupby(groupby_col)[column].agg(['mean', 'std', 'min', 'max']).reset_index()

return pattern

def correlate_with_weather(self, sensor_data, weather_data, sensor_column, weather_column):
"""Correlate internal sensor readings with external weather data."""
if len(sensor_data) < 10 or len(weather_data) < 10:
return None

# Convert timestamps to datetime if they're not already
if not pd.api.types.is_datetime64_any_dtype(sensor_data['timestamp']):
sensor_data = sensor_data.copy()
sensor_data['timestamp'] = pd.to_datetime(sensor_data['timestamp'])

if not pd.api.types.is_datetime64_any_dtype(weather_data['timestamp']):
weather_data = weather_data.copy()
weather_data['timestamp'] = pd.to_datetime(weather_data['timestamp'])

# Merge data on closest timestamp
sensor_data['timestamp_key'] = sensor_data['timestamp'].dt.floor('H')
weather_data['timestamp_key'] = weather_data['timestamp'].dt.floor('H')

merged = pd.merge_asof(
sensor_data.sort_values('timestamp'),
weather_data.sort_values('timestamp')[['timestamp', weather_column, 'timestamp_key']],
left_on='timestamp',
right_on='timestamp',
direction='nearest',
tolerance=pd.Timedelta('1H')
)

# Drop rows with missing values
valid_data = merged.dropna(subset=[sensor_column, weather_column])

if len(valid_data) < 10:
return None

# Calculate correlation
correlation, p_value = stats.pearsonr(valid_data[sensor_column], valid_data[weather_column])

return {
'correlation': correlation,
'p_value': p_value,
'significant': p_value < 0.05,
'sample_size': len(valid_data)
}

def calculate_statistics(self, data, columns, groupby=None):
"""Calculate summary statistics for sensor readings."""
if len(data) == 0:
return {}

stats = {}

if groupby:
# Group by specified column(s)
grouped = data.groupby(groupby)

for col in columns:
if col in data.columns:
stats[col] = {
'by_group': grouped[col].agg(['count', 'mean', 'std', 'min', 'max']).to_dict()
}
else:
# Overall statistics
for col in columns:
if col in data.columns:
stats[col] = {
'count': data[col].count(),
'mean': data[col].mean(),
'std': data[col].std(),
'min': data[col].min(),
'max': data[col].max(),
'median': data[col].median(),
'q1': data[col].quantile(0.25),
'q3': data[col].quantile(0.75)
}

return stats

def forecast_trends(self, data, column, periods=24):
"""Simple forecasting using historical patterns."""
if len(data) < 48:
return None

# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_any_dtype(data['timestamp']):
data = data.copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])

# Resample to hourly data
hourly_data = data.set_index('timestamp')[column].resample('H').mean()

# Fill missing values
hourly_data = hourly_data.fillna(method='ffill').fillna(method='bfill')

if hourly_data.isna().any():
return None

# Determine seasonality (24 hours for daily pattern)
seasonality = 24

# Simple forecasting using seasonal mean
last_period = hourly_data.iloc[-seasonality:].reset_index(drop=True)

forecast = []
for i in range(periods):
# Use the corresponding hour from the last observed period
forecast_value = last_period[i % len(last_period)]
forecast.append(forecast_value)

# Generate forecast timestamps
last_timestamp = hourly_data.index[-1]
forecast_timestamps = [last_timestamp + datetime.timedelta(hours=i+1) for i in range(periods)]

forecast_df = pd.DataFrame({
'timestamp': forecast_timestamps,
f'forecast_{column}': forecast
})

return forecast_df

class NotificationManager:
"""Manage alerts and notifications for threshold violations."""

def __init__(self, email_config=None, webhook_url=None):
self.email_config = email_config or {}
self.webhook_url = webhook_url

def send_email_notification(self, subject, message):
"""Send an email notification."""
if not self.email_config:
logger.warning("Email configuration not provided, skipping email notification")
return False

try:
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = subject
msg['From'] = self.email_config.get('sender', 'noreply@example.com')
msg['To'] = self.email_config.get('recipient', 'admin@example.com')

with smtplib.SMTP(self.email_config.get('server', 'localhost'),
self.email_config.get('port', 25)) as server:
if self.email_config.get('use_tls', False):
server.starttls()

if 'username' in self.email_config and 'password' in self.email_config:
server.login(self.email_config['username'], self.email_config['password'])

server.send_message(msg)

logger.info(f"Email notification sent: {subject}")
return True

except Exception as e:
logger.error(f"Failed to send email notification: {str(e)}")
return False

def send_webhook_notification(self, payload):
"""Send a notification to a webhook."""
if not self.webhook_url:
logger.warning("Webhook URL not provided, skipping webhook notification")
return False

try:
response = requests.post(
self.webhook_url,
json=payload,
headers={'Content-Type': 'application/json'}
)

if response.status_code >= 200 and response.status_code < 300:
logger.info(f"Webhook notification sent successfully")
return True
else:
logger.warning(f"Webhook returned non-success status code: {response.status_code}")
return False

except Exception as e:
logger.error(f"Failed to send webhook notification: {str(e)}")
return False

def process_alert(self, alert):
"""Process an alert and send appropriate notifications."""
# Format alert message
subject = f"Environmental Alert: {alert['alert_type'].title()} {alert['measurement_type']} at {alert['location_name']}"

message = f"""
Environmental Monitoring System Alert

Location: {alert['building']} - {alert['location_name']}
Sensor Type: {alert['sensor_type']}
Alert Type: {alert['alert_type'].title()}
Measurement: {alert['measurement_type']}
Value: {alert['measurement_value']}
Threshold: {alert['threshold_value']}
Timestamp: {alert['timestamp']}

Message: {alert['message']}

This is an automated alert from the Environmental Monitoring System.
"""

# Send email notification
email_sent = self.send_email_notification(subject, message)

# Send webhook notification
webhook_payload = {
'alert_id': alert['id'],
'alert_type': alert['alert_type'],
'measurement_type': alert['measurement_type'],
'measurement_value': float(alert['measurement_value']),
'threshold_value': float(alert['threshold_value']),
'location': alert['location_name'],
'building': alert['building'],
'sensor_type': alert['sensor_type'],
'timestamp': alert['timestamp'],
'message': alert['message']
}
webhook_sent = self.send_webhook_notification(webhook_payload)

return {
'email_sent': email_sent,
'webhook_sent': webhook_sent
}

# Initialize database
db = EnvironmentDatabase()
analyzer = DataAnalyzer(db)
notification_manager = NotificationManager(
webhook_url='http://example.com/webhook' # Replace with actual webhook URL
)

# Flask API
server = Flask(__name__)

@server.route('/api/sensors', methods=['GET'])
def get_sensors():
sensors = db.get_sensor_info()
return jsonify(sensors.to_dict(orient='records'))

@server.route('/api/readings/latest', methods=['GET'])
def get_latest_readings():
readings = db.get_latest_readings()
return jsonify(readings.to_dict(orient='records'))

@server.route('/api/readings', methods=['GET'])
def get_readings():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=1)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())

sensor_ids = request.args.getlist('sensor_id')
sensor_ids = [int(s) for s in sensor_ids] if sensor_ids else None

measurement_types = request.args.getlist('measurement_type')

readings = db.get_readings_for_period(start_time, end_time, sensor_ids, measurement_types)
return jsonify(readings.to_dict(orient='records'))

@server.route('/api/sensors/<int:sensor_id>/readings', methods=['POST'])
def add_reading(sensor_id):
data = request.json
success = db.add_sensor_reading(sensor_id, data)

if success:
return jsonify({'status': 'success', 'message': 'Reading added successfully'})
else:
return jsonify({'status': 'error', 'message': 'Failed to add reading'}), 400

@server.route('/api/alerts', methods=['GET'])
def get_alerts():
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')

acknowledged = request.args.get('acknowledged')
if acknowledged is not None:
acknowledged = acknowledged.lower() == 'true'

alerts = db.get_alerts(start_time, end_time, acknowledged)
return jsonify(alerts.to_dict(orient='records'))

@server.route('/api/alerts/<int:alert_id>/acknowledge', methods=['POST'])
def acknowledge_alert(alert_id):
success = db.acknowledge_alert(alert_id)

if success:
return jsonify({'status': 'success', 'message': 'Alert acknowledged'})
else:
return jsonify({'status': 'error', 'message': 'Alert not found'}), 404

@server.route('/api/analysis/anomalies', methods=['GET'])
def detect_anomalies():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=7)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())

sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)

measurement_type = request.args.get('measurement_type', 'temperature')

readings = db.get_readings_for_period(start_time, end_time, [sensor_id] if sensor_id else None)

if len(readings) == 0:
return jsonify({'status': 'error', 'message': 'No data available for analysis'}), 400

columns = [col for col in readings.columns if col in [
'temperature', 'humidity', 'air_quality_index', 'co2_level',
'noise_level', 'light_level', 'pressure', 'particulate_matter', 'voc_level'
] and not readings[col].isna().all()]

if measurement_type in columns:
columns = [measurement_type]

if not columns:
return jsonify({'status': 'error', 'message': 'No valid measurement data available'}), 400

anomalies = analyzer.detect_anomalies(readings, columns)

return jsonify({
'status': 'success',
'anomalies': anomalies,
'total': len(anomalies)
})

@server.route('/api/analysis/patterns', methods=['GET'])
def identify_patterns():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=7)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())

sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)

measurement_type = request.args.get('measurement_type', 'temperature')
period = request.args.get('period', 'daily')

readings = db.get_readings_for_period(start_time, end_time, [sensor_id] if sensor_id else None)

if len(readings) == 0:
return jsonify({'status': 'error', 'message': 'No data available for analysis'}), 400

if measurement_type not in readings.columns or readings[measurement_type].isna().all():
return jsonify({'status': 'error', 'message': f'No {measurement_type} data available'}), 400

pattern = analyzer.identify_patterns(readings, measurement_type, period)

if pattern is None:
return jsonify({'status': 'error', 'message': 'Failed to identify patterns'}), 400

return jsonify({
'status': 'success',
'period': period,
'measurement_type': measurement_type,
'pattern': pattern.to_dict(orient='records')
})

@server.route('/api/analysis/weather-correlation', methods=['GET'])
def correlate_with_weather():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=30)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())

sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
else:
return jsonify({'status': 'error', 'message': 'Sensor ID is required'}), 400

sensor_measurement = request.args.get('sensor_measurement', 'temperature')
weather_measurement = request.args.get('weather_measurement', 'temperature')

# Get sensor data
sensor_data = db.get_readings_for_period(start_time, end_time, [sensor_id])

if len(sensor_data) == 0 or sensor_measurement not in sensor_data.columns or sensor_data[sensor_measurement].isna().all():
return jsonify({'status': 'error', 'message': f'No {sensor_measurement} data available for sensor {sensor_id}'}), 400

# Get weather data
weather_data = db.get_external_weather(start_time, end_time)

if len(weather_data) == 0 or weather_measurement not in weather_data.columns or weather_data[weather_measurement].isna().all():
return jsonify({'status': 'error', 'message': f'No {weather_measurement} weather data available'}), 400

correlation = analyzer.correlate_with_weather(sensor_data, weather_data, sensor_measurement, weather_measurement)

if correlation is None:
return jsonify({'status': 'error', 'message': 'Failed to calculate correlation'}), 400

return jsonify({
'status': 'success',
'sensor_measurement': sensor_measurement,
'weather_measurement': weather_measurement,
'correlation': correlation
})

@server.route('/api/analysis/statistics', methods=['GET'])
def calculate_statistics():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=7)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())

sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)

measurement_types = request.args.getlist('measurement_type')
if not measurement_types:
measurement_types = [
'temperature', 'humidity', 'air_quality_index', 'co2_level',
'noise_level', 'light_level', 'pressure', 'particulate_matter', 'voc_level'
]

groupby = request.args.getlist('groupby')

readings = db.get_readings_for_period(start_time, end_time, [sensor_id] if sensor_id else None)

if len(readings) == 0:
return jsonify({'status': 'error', 'message': 'No data available for analysis'}), 400

statistics = analyzer.calculate_statistics(readings, measurement_types, groupby if groupby else None)

return jsonify({
'status': 'success',
'statistics': statistics
})

@server.route('/api/analysis/forecast', methods=['GET'])
def forecast_trends():
start_time = request.args.get('start_time', (datetime.datetime.now() - datetime.timedelta(days=14)).isoformat())
end_time = request.args.get('end_time', datetime.datetime.now().isoformat())

sensor_id = request.args.get('sensor_id')
if sensor_id:
sensor_id = int(sensor_id)
else:
return jsonify({'status': 'error', 'message': 'Sensor ID is required'}), 400

measurement_type = request.args.get('measurement_type', 'temperature')
periods = request.args.get('periods', 24, type=int)

readings = db.get_readings_for_period(start_time, end_time, [sensor_id])

if len(readings) == 0 or measurement_type not in readings.columns or readings[measurement_type].isna().all():
return jsonify({'status': 'error', 'message': f'No {measurement_type} data available for forecast'}), 400

forecast = analyzer.forecast_trends(readings, measurement_type, periods)

if forecast is None:
return jsonify({'status': 'error', 'message': 'Failed to generate forecast'}), 400

return jsonify({
'status': 'success',
'measurement_type': measurement_type,
'forecast': forecast.to_dict(orient='records')
})

# Dash App
dash_app = dash.Dash(__name__, server=server, url_base_pathname='/dashboard/')
dash_app.title = 'Environmental Monitoring Dashboard'

# Define the layout
dash_app.layout = html.Div([
html.Div([
html.H1("Environmental Monitoring Dashboard", style={'margin-bottom': '20px'}),

dcc.Tabs([
dcc.Tab(label="Overview", children=[
html.Div([
html.H2("Current Environmental Conditions"),
html.Div(id="last-updated"),

html.Div([
html.Div([
html.H3("Temperature"),
dcc.Graph(id='temp-gauge')
], className="four columns"),

html.Div([
html.H3("Humidity"),
dcc.Graph(id='humidity-gauge')
], className="four columns"),

html.Div([
html.H3("Air Quality"),
dcc.Graph(id='aqi-gauge')
], className="four columns"),
], className="row"),

html.Div([
html.Div([
html.H3("CO2 Levels"),
dcc.Graph(id='co2-gauge')
], className="four columns"),

html.Div([
html.H3("Noise Levels"),
dcc.Graph(id='noise-gauge')
], className="four columns"),

html.Div([
html.H3("Locations Map"),
dcc.Graph(id='locations-map')
], className="four columns"),
], className="row"),

html.H2("Alerts", style={'margin-top': '30px'}),
html.Div(id='alerts-table')
])
]),

dcc.Tab(label="Detailed Analysis", children=[
html.Div([
html.Div([
html.H3("Filters"),

html.Label("Select Location/Sensor:"),
dcc.Dropdown(id='sensor-dropdown'),

html.Label("Select Measurement:"),
dcc.Dropdown(
id='measurement-dropdown',
options=[
{'label': 'Temperature', 'value': 'temperature'},
{'label': 'Humidity', 'value': 'humidity'},
{'label': 'Air Quality Index', 'value': 'air_quality_index'},
{'label': 'CO2 Level', 'value': 'co2_level'},
{'label': 'Noise Level', 'value': 'noise_level'},
{'label': 'Light Level', 'value': 'light_level'},
{'label': 'Pressure', 'value': 'pressure'},
{'label': 'Particulate Matter', 'value': 'particulate_matter'},
{'label': 'VOC Level', 'value': 'voc_level'}
],
value='temperature'
),

html.Label("Time Range:"),
dcc.DatePickerRange(
id='date-picker',
start_date=(datetime.datetime.now() - datetime.timedelta(days=7)).date(),
end_date=datetime.datetime.now().date()
),

html.Button('Apply Filters', id='apply-filters', n_clicks=0)
], className="three columns"),

html.Div([
html.H3("Historical Data"),
dcc.Graph(id='time-series-graph'),

html.H3("Daily Patterns", style={'margin-top': '30px'}),
dcc.Graph(id='daily-pattern-graph'),

html.H3("Weather Correlation", style={'margin-top': '30px'}),
dcc.Graph(id='weather-correlation-graph')
], className="nine columns")
], className="row")
]),

dcc.Tab(label="Forecasting", children=[
html.Div([
html.H2("Environmental Forecasts"),

html.Div([
html.Div([
html.Label("Select Sensor:"),
dcc.Dropdown(id='forecast-sensor-dropdown'),

html.Label("Select Measurement:"),
dcc.Dropdown(
id='forecast-measurement-dropdown',
options=[
{'label': 'Temperature', 'value': 'temperature'},
{'label': 'Humidity', 'value': 'humidity'},
{'label': 'Air Quality Index', 'value': 'air_quality_index'},
{'label': 'CO2 Level', 'value': 'co2_level'},
{'label': 'Noise Level', 'value': 'noise_level'}
],
value='temperature'
),

html.Label("Forecast Period (hours):"),
dcc.Slider(
id='forecast-period-slider',
min=12,
max=72,
value=24,
marks={i: str(i) for i in range(12, 73, 12)},
step=12
),

html.Button('Generate Forecast', id='generate-forecast', n_clicks=0)
], className="three columns"),

html.Div([
dcc.Graph(id='forecast-graph')
], className="nine columns")
], className="row")
])
])
])
], style={'padding': '20px'})
])

# Callbacks
@dash_app.callback(
[Output('sensor-dropdown', 'options'),
Output('sensor-dropdown', 'value'),
Output('forecast-sensor-dropdown', 'options'),
Output('forecast-sensor-dropdown', 'value')],
[Input('apply-filters', 'n_clicks')]
)
def update_dropdowns(n_clicks):
sensors = db.get_sensor_info()

# Create options for dropdowns
options = []
for _, sensor in sensors.iterrows():
label = f"{sensor['location_name']} - {sensor['sensor_type']}"
value = sensor['id']
options.append({'label': label, 'value': value})

return options, options[0]['value'] if options else None, options, options[0]['value'] if options else None

@dash_app.callback(
[Output('temp-gauge', 'figure'),
Output('humidity-gauge', 'figure'),
Output('aqi-gauge', 'figure'),
Output('co2-gauge', 'figure'),
Output('noise-gauge', 'figure'),
Output('last-updated', 'children')],
[Input('apply-filters', 'n_clicks')]
)
def update_gauges(n_clicks):
latest_readings = db.get_latest_readings()

# Temperature gauge
temp_data = latest_readings[['sensor_id', 'temperature', 'location_name']].dropna(subset=['temperature'])
avg_temp = temp_data['temperature'].mean() if not temp_data.empty else None

temp_fig = go.Figure()
if avg_temp is not None:
temp_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_temp,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 40]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 18], 'color': "lightblue"},
{'range': [18, 22], 'color': "green"},
{'range': [22, 28], 'color': "yellow"},
{'range': [28, 40], 'color': "red"}
]
}
))
temp_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))

# Humidity gauge
humidity_data = latest_readings[['sensor_id', 'humidity', 'location_name']].dropna(subset=['humidity'])
avg_humidity = humidity_data['humidity'].mean() if not humidity_data.empty else None

humidity_fig = go.Figure()
if avg_humidity is not None:
humidity_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_humidity,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 30], 'color': "orange"},
{'range': [30, 60], 'color': "green"},
{'range': [60, 100], 'color': "lightblue"}
]
}
))
humidity_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))

# AQI gauge
aqi_data = latest_readings[['sensor_id', 'air_quality_index', 'location_name']].dropna(subset=['air_quality_index'])
avg_aqi = aqi_data['air_quality_index'].mean() if not aqi_data.empty else None

aqi_fig = go.Figure()
if avg_aqi is not None:
aqi_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_aqi,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [0, 300]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 50], 'color': "green"},
{'range': [50, 100], 'color': "yellow"},
{'range': [100, 150], 'color': "orange"},
{'range': [150, 200], 'color': "red"},
{'range': [200, 300], 'color': "purple"}
]
}
))
aqi_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))

# CO2 gauge
co2_data = latest_readings[['sensor_id', 'co2_level', 'location_name']].dropna(subset=['co2_level'])
avg_co2 = co2_data['co2_level'].mean() if not co2_data.empty else None

co2_fig = go.Figure()
if avg_co2 is not None:
co2_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_co2,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [300, 2000]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [300, 600], 'color': "green"},
{'range': [600, 1000], 'color': "yellow"},
{'range': [1000, 1500], 'color': "orange"},
{'range': [1500, 2000], 'color': "red"}
]
}
))
co2_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))

# Noise gauge
noise_data = latest_readings[['sensor_id', 'noise_level', 'location_name']].dropna(subset=['noise_level'])
avg_noise = noise_data['noise_level'].mean() if not noise_data.empty else None

noise_fig = go.Figure()
if avg_noise is not None:
noise_fig.add_trace(go.Indicator(
mode="gauge+number",
value=avg_noise,
domain={'x': [0, 1], 'y': [0, 1]},
gauge={
'axis': {'range': [20, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [20, 40], 'color': "green"},
{'range': [40, 60], 'color': "yellow"},
{'range': [60, 80], 'color': "orange"},
{'range': [80, 100], 'color': "red"}
]
}
))
noise_fig.update_layout(height=300, margin=dict(l=20, r=20, t=30, b=20))

# Last updated
last_updated = latest_readings['timestamp'].max() if not latest_readings.empty else None

if last_updated:
last_updated_text = html.P(f"Last updated: {pd.to_datetime(last_updated).strftime('%Y-%m-%d %H:%M:%S')}")
else:
last_updated_text = html.P("No data available")

return temp_fig, humidity_fig, aqi_fig, co2_fig, noise_fig, last_updated_text

@dash_app.callback(
Output('locations-map', 'figure'),
[Input('apply-filters', 'n_clicks')]
)
def update_map(n_clicks):
# Get sensor locations
sensors = db.get_sensor_info()

# Create map
fig = go.Figure()

if not sensors.empty:
# Add indoor locations (buildings)
buildings = sensors[sensors['indoor'] == True].drop_duplicates(['building', 'location_name'])

for _, building in buildings.iterrows():
fig.add_trace(go.Scattergeo(
lon=[building['longitude']],
lat=[building['latitude']],
text=f"{building['building']} - {building['location_name']}",
mode='markers',
marker=dict(
size=10,
color='blue',
symbol='square'
),
name=building['building']
))

# Add outdoor locations
outdoors = sensors[sensors['indoor'] == False].drop_duplicates(['location_name'])

if not outdoors.empty:
fig.add_trace(go.Scattergeo(
lon=outdoors['longitude'],
lat=outdoors['latitude'],
text=outdoors['location_name'],
mode='markers',
marker=dict(
size=10,
color='green',
symbol='circle'
),
name='Outdoor Sensors'
))

# Center the map
center_lat = sensors['latitude'].mean()
center_lon = sensors['longitude'].mean()

fig.update_geos(
center=dict(lat=center_lat, lon=center_lon),
projection_scale=15, # Adjust zoom level
showcoastlines=True, coastlinecolor="RebeccaPurple",
showland=True, landcolor="LightGreen",
showocean=True, oceancolor="LightBlue",
showlakes=True, lakecolor="Blue"
)

fig.update_layout(
height=300,
margin=dict(l=20, r=20, t=30, b=20),
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
)
)

return fig

@dash_app.callback(
Output('alerts-table', 'children'),
[Input('apply-filters', 'n_clicks')]
)
def update_alerts_table(n_clicks):
# Get recent unacknowledged alerts
alerts = db.get_alerts(
start_time=(datetime.datetime.now() - datetime.timedelta(days=1)).isoformat(),
acknowledged=False
)

if alerts.empty:
return html.P("No active alerts")

# Sort by timestamp, most recent first
alerts = alerts.sort_values('timestamp', ascending=False)

# Create table
table = html.Table(
# Header
[html.Tr([
html.Th("Time"),
html.Th("Location"),
html.Th("Type"),
html.Th("Measurement"),
html.Th("Value"),
html.Th("Threshold"),
html.Th("Action")
])] +
# Body
[html.Tr([
html.Td(pd.to_datetime(row['timestamp']).strftime('%Y-%m-%d %H:%M')),
html.Td(f"{row['building']} - {row['location_name']}"),
html.Td(row['alert_type'].capitalize()),
html.Td(row['measurement_type'].replace('_', ' ').capitalize()),
html.Td(f"{row['measurement_value']:.1f}"),
html.Td(f"{row['threshold_value']:.1f}"),
html.Td(html.Button("Acknowledge", id={'type': 'ack-button', 'index': row['id']}))
]) for _, row in alerts.iterrows()]
)

return table

@dash_app.callback(
[Output('time-series-graph', 'figure'),
Output('daily-pattern-graph', 'figure'),
Output('weather-correlation-graph', 'figure')],
[Input('apply-filters', 'n_clicks')],
[State('sensor-dropdown', 'value'),
State('measurement-dropdown', 'value'),
State('date-picker', 'start_date'),
State('date-picker', 'end_date')]
)
def update_analysis_graphs(n_clicks, sensor_id, measurement, start_date, end_date):
if not sensor_id or not measurement:
# Return empty figures
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'No data selected',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig, empty_fig, empty_fig

# Convert dates to ISO format
start_time = f"{start_date}T00:00:00"
end_time = f"{end_date}T23:59:59"

# Get sensor data
readings = db.get_readings_for_period(start_time, end_time, [sensor_id])

if readings.empty or measurement not in readings.columns or readings[measurement].isna().all():
# Return empty figures
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'No data available',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig, empty_fig, empty_fig

# Get sensor location info
sensor_info = db.get_sensor_info()
location_info = sensor_info[sensor_info['id'] == sensor_id].iloc[0]
location_name = f"{location_info['building']} - {location_info['location_name']}"

# Time series graph
time_series_fig = go.Figure()

# Add the measurement data
time_series_fig.add_trace(go.Scatter(
x=pd.to_datetime(readings['timestamp']),
y=readings[measurement],
mode='lines',
name=measurement.replace('_', ' ').title()
))

# Get threshold values for the measurement
thresholds = db.get_thresholds(location_id=location_info['location_id'])
measurement_thresholds = thresholds[thresholds['measurement_type'] == measurement]

if not measurement_thresholds.empty:
min_val = measurement_thresholds['min_value'].iloc[0]
max_val = measurement_thresholds['max_value'].iloc[0]

if min_val is not None:
time_series_fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=min_val,
x1=readings['timestamp'].max(),
y1=min_val,
line=dict(color="red", width=2, dash="dash")
)

if max_val is not None:
time_series_fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=max_val,
x1=readings['timestamp'].max(),
y1=max_val,
line=dict(color="red", width=2, dash="dash")
)

time_series_fig.update_layout(
title=f"{measurement.replace('_', ' ').title()} Over Time - {location_name}",
xaxis_title="Time",
yaxis_title=measurement.replace('_', ' ').title(),
hovermode="x unified"
)

# Daily pattern graph
pattern = analyzer.identify_patterns(readings, measurement, 'daily')

if pattern is None:
pattern_fig = go.Figure()
pattern_fig.update_layout(
annotations=[{
'text': 'Insufficient data for pattern analysis',
'showarrow': False,
'font': {'size': 20}
}]
)
else:
pattern_fig = go.Figure()

# Add mean line
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['mean'],
mode='lines+markers',
name='Average',
line=dict(color='blue', width=2)
))

# Add range (mean ± std)
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['mean'] + pattern['std'],
mode='lines',
line=dict(width=0),
showlegend=False
))

pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['mean'] - pattern['std'],
mode='lines',
line=dict(width=0),
fill='tonexty',
fillcolor='rgba(0, 0, 255, 0.2)',
name='Std Dev'
))

# Add min and max
pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['min'],
mode='lines',
line=dict(color='green', width=1, dash='dot'),
name='Min'
))

pattern_fig.add_trace(go.Scatter(
x=pattern['hour'],
y=pattern['max'],
mode='lines',
line=dict(color='red', width=1, dash='dot'),
name='Max'
))

pattern_fig.update_layout(
title=f"Daily Pattern - {measurement.replace('_', ' ').title()} - {location_name}",
xaxis_title="Hour of Day",
yaxis_title=measurement.replace('_', ' ').title(),
xaxis=dict(tickmode='linear', tick0=0, dtick=2),
hovermode="x unified"
)

# Weather correlation graph
weather_measurement = 'temperature' if measurement != 'temperature' else 'humidity'

# Get weather data
weather_data = db.get_external_weather(start_time, end_time)

if weather_data.empty or weather_measurement not in weather_data.columns or weather_data[weather_measurement].isna().all():
correlation_fig = go.Figure()
correlation_fig.update_layout(
annotations=[{
'text': 'No weather data available for correlation',
'showarrow': False,
'font': {'size': 20}
}]
)
else:
correlation = analyzer.correlate_with_weather(readings, weather_data, measurement, weather_measurement)

if correlation is None:
correlation_fig = go.Figure()
correlation_fig.update_layout(
annotations=[{
'text': 'Insufficient data for correlation analysis',
'showarrow': False,
'font': {'size': 20}
}]
)
else:
# Create a merged dataset for the scatter plot
sensor_data = readings[['timestamp', measurement]].copy()
sensor_data['timestamp'] = pd.to_datetime(sensor_data['timestamp'])
sensor_data['timestamp_key'] = sensor_data['timestamp'].dt.floor('H')

weather_data = weather_data[['timestamp', weather_measurement]].copy()
weather_data['timestamp'] = pd.to_datetime(weather_data['timestamp'])
weather_data['timestamp_key'] = weather_data['timestamp'].dt.floor('H')

merged = pd.merge_asof(
sensor_data.sort_values('timestamp'),
weather_data.sort_values('timestamp'),
left_on='timestamp',
right_on='timestamp',
direction='nearest',
tolerance=pd.Timedelta('1H')
)

# Drop rows with missing values
merged = merged.dropna(subset=[measurement, weather_measurement])

correlation_fig = go.Figure(data=go.Scatter(
x=merged[weather_measurement],
y=merged[measurement],
mode='markers',
marker=dict(
size=8,
color='blue',
opacity=0.6
)
))

# Add a best fit line
if len(merged) > 2:
# Linear regression
slope, intercept = np.polyfit(merged[weather_measurement], merged[measurement], 1)
x_range = np.linspace(merged[weather_measurement].min(), merged[weather_measurement].max(), 100)
y_range = slope * x_range + intercept

correlation_fig.add_trace(go.Scatter(
x=x_range,
y=y_range,
mode='lines',
line=dict(color='red', width=2),
name=f'Fit: y = {slope:.2f}x + {intercept:.2f}'
))

correlation_fig.update_layout(
title=f"Correlation with Weather - {correlation['correlation']:.2f} (p={correlation['p_value']:.4f})",
xaxis_title=weather_measurement.replace('_', ' ').title(),
yaxis_title=measurement.replace('_', ' ').title(),
hovermode="closest"
)

return time_series_fig, pattern_fig, correlation_fig

@dash_app.callback(
Output('forecast-graph', 'figure'),
[Input('generate-forecast', 'n_clicks')],
[State('forecast-sensor-dropdown', 'value'),
State('forecast-measurement-dropdown', 'value'),
State('forecast-period-slider', 'value')]
)
def update_forecast(n_clicks, sensor_id, measurement, periods):
if not n_clicks or not sensor_id or not measurement:
# Return empty figure
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'Click "Generate Forecast" to see predictions',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig

# Get historical data for the past 14 days
end_time = datetime.datetime.now().isoformat()
start_time = (datetime.datetime.now() - datetime.timedelta(days=14)).isoformat()

# Get sensor data
readings = db.get_readings_for_period(start_time, end_time, [sensor_id])

if readings.empty or measurement not in readings.columns or readings[measurement].isna().all():
# Return empty figure
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'No data available for forecast',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig

# Get sensor location info
sensor_info = db.get_sensor_info()
location_info = sensor_info[sensor_info['id'] == sensor_id].iloc[0]
location_name = f"{location_info['building']} - {location_info['location_name']}"

# Generate forecast
forecast = analyzer.forecast_trends(readings, measurement, periods)

if forecast is None:
# Return empty figure
empty_fig = go.Figure()
empty_fig.update_layout(
annotations=[{
'text': 'Insufficient data for forecasting',
'showarrow': False,
'font': {'size': 20}
}]
)
return empty_fig

# Create figure with both historical and forecasted data
fig = go.Figure()

# Add historical data
fig.add_trace(go.Scatter(
x=pd.to_datetime(readings['timestamp']),
y=readings[measurement],
mode='lines',
name='Historical Data',
line=dict(color='blue')
))

# Add forecasted data
forecast_col = f'forecast_{measurement}'
fig.add_trace(go.Scatter(
x=pd.to_datetime(forecast['timestamp']),
y=forecast[forecast_col],
mode='lines',
name='Forecast',
line=dict(color='red', dash='dash')
))

# Get threshold values for the measurement
thresholds = db.get_thresholds(location_id=location_info['location_id'])
measurement_thresholds = thresholds[thresholds['measurement_type'] == measurement]

if not measurement_thresholds.empty:
min_val = measurement_thresholds['min_value'].iloc[0]
max_val = measurement_thresholds['max_value'].iloc[0]

if min_val is not None:
fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=min_val,
x1=forecast['timestamp'].max(),
y1=min_val,
line=dict(color="orange", width=2, dash="dot")
)

if max_val is not None:
fig.add_shape(
type="line",
x0=readings['timestamp'].min(),
y0=max_val,
x1=forecast['timestamp'].max(),
y1=max_val,
line=dict(color="orange", width=2, dash="dot")
)

fig.update_layout(
title=f"{measurement.replace('_', ' ').title()} Forecast - {location_name}",
xaxis_title="Time",
yaxis_title=measurement.replace('_', ' ').title(),
hovermode="x unified"
)

# Add vertical line separating historical and forecast data
current_time = datetime.datetime.now()
fig.add_shape(
type="line",
x0=current_time,
y0=readings[measurement].min(),
x1=current_time,
y1=readings[measurement].max(),
line=dict(color="green", width=2, dash="dash")
)

fig.add_annotation(
x=current_time,
y=readings[measurement].max(),
text="Now",
showarrow=True,
arrowhead=1
)

return fig

# Alert monitoring thread
def monitor_alerts():
"""Background thread to monitor and process alerts."""
while True:
try:
# Get unprocessed alerts (last hour, unacknowledged)
alerts = db.get_alerts(
start_time=(datetime.datetime.now() - datetime.timedelta(hours=1)).isoformat(),
acknowledged=False
)

if not alerts.empty:
for _, alert in alerts.iterrows():
# Process the alert
notification_manager.process_alert(alert)

# Mark as acknowledged so we don't process it again
db.acknowledge_alert(alert['id'])

# Sleep for a while
time.sleep(60) # Check every minute

except Exception as e:
logger.error(f"Error in alert monitoring thread: {str(e)}")
time.sleep(300) # Sleep longer on error

# Start the alert monitoring thread
alert_thread = threading.Thread(target=monitor_alerts, daemon=True)
alert_thread.start()

if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)

Resources:

Tips for Project Success {#tips}

To make the most of your Python project experience:

  1. Start with planning: Create a detailed project plan before writing any code
  2. Use version control: Set up Git from the beginning to track changes
  3. Document thoroughly: Write clear documentation as you go
  4. Test consistently: Implement unit tests to ensure functionality
  5. Seek feedback: Share your work with peers and instructors
  6. Address real problems: Focus on solving genuine issues to stay motivated
  7. Present professionally: Create demos and presentations of your finished project

FAQs About Python Projects {#faqs}

How long should a college Python project take to complete?

The timeframe varies depending on project complexity and your experience level. Simple projects might take 2-4 weeks, while more complex ones can span an entire semester. Set realistic milestones and track your progress.

Do I need advanced Python knowledge to complete these projects?

While some projects require intermediate to advanced skills, you can modify any project to match your current skill level. Start with components you understand and gradually incorporate more complex elements as you learn.

How do I showcase these projects to potential employers?

Create a GitHub portfolio with well-documented repositories, include demonstration videos, develop case studies explaining your process, and consider hosting live demos for web-based projects. Make sure to highlight the problems solved and skills demonstrated.

Can I work on these projects in teams?

Absolutely! Many of these projects benefit from collaborative work. Divide responsibilities based on team members’ strengths and use project management tools to coordinate efforts.

What if I encounter problems I can’t solve?

This is normal and part of the learning process. Utilize online communities like Stack Overflow, GitHub Discussions, or Reddit’s r/learnpython. Don’t hesitate to consult with professors or teaching assistants for guidance.

How can I extend these projects for my thesis or capstone?

To extend these projects for advanced academic work, consider adding research components, conducting user studies, implementing more sophisticated algorithms, or combining multiple projects into larger systems with broader implications.


Conclusion

These top 10 Python projects offer excellent opportunities for college students to develop practical skills while building impressive portfolio pieces. By tackling real-world problems with Python’s powerful capabilities, you’ll position yourself for success in today’s competitive tech landscape.

Remember that the journey of completing these projects is as valuable as the final result. Each challenge you overcome strengthens your problem-solving abilities and deepens your understanding of programming concepts.

We’d love to hear about your experiences with these projects! Share your progress in the comments below or reach out with questions if you need guidance getting started.

[Interested in more programming resources? Check out our complete Python learning path for beginners]


About the Author

Photo of Harshit Arya

Harshit Arya – I am a robotics & machine learning enthusiast, who always seeks to develop innovative projects and to solve real world problems. I have also worked on many Python, web & app based projects. Apart from this I am a quick learner and problem solver.


Tags: Python projects, college students, programming projects, computer science projects, Python programming, student portfolio, coding projects