fina/backup/first -fina app/app/routes/main.py

811 lines
28 KiB
Python
Raw Permalink Normal View History

2025-12-26 00:52:56 +00:00
from flask import Blueprint, render_template, request, redirect, url_for, flash, send_file, jsonify, current_app
from flask_login import login_required, current_user
from app import db
from app.models.category import Category, Expense
from app.models.user import Tag
from werkzeug.utils import secure_filename
import os
from datetime import datetime
from sqlalchemy import extract, func
import json
bp = Blueprint('main', __name__)
def allowed_file(filename):
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'pdf', 'gif'}
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@bp.route('/')
def index():
if current_user.is_authenticated:
return redirect(url_for('main.dashboard'))
return redirect(url_for('auth.login'))
@bp.route('/dashboard')
@login_required
def dashboard():
from app.models.subscription import Subscription
from datetime import timedelta, date
today = date.today()
categories = Category.query.filter_by(user_id=current_user.id).all()
total_spent = sum(cat.get_total_spent() for cat in categories)
total_expenses = Expense.query.filter_by(user_id=current_user.id).count()
years_query = db.session.query(
extract('year', Expense.date).label('year')
).filter(
Expense.user_id == current_user.id
).distinct().all()
available_years = sorted([int(year[0]) for year in years_query if year[0]], reverse=True)
if not available_years:
available_years = [datetime.now().year]
current_year = datetime.now().year
chart_data = []
for cat in categories:
spent = cat.get_total_spent()
if spent > 0:
chart_data.append({
'name': cat.name,
'value': spent,
'color': cat.color
})
categories_json = [
{
'id': cat.id,
'name': cat.name,
'color': cat.color
}
for cat in categories
]
# Get upcoming subscriptions (next 30 days)
end_date = datetime.now().date() + timedelta(days=30)
upcoming_subscriptions = Subscription.query.filter(
Subscription.user_id == current_user.id,
Subscription.is_active == True,
Subscription.next_due_date <= end_date
).order_by(Subscription.next_due_date).limit(5).all()
# Get suggestions count
from app.smart_detection import get_user_suggestions
suggestions_count = len(get_user_suggestions(current_user.id))
return render_template('dashboard.html',
categories=categories,
total_spent=total_spent,
total_expenses=total_expenses,
chart_data=chart_data,
categories_json=categories_json,
available_years=available_years,
current_year=current_year,
upcoming_subscriptions=upcoming_subscriptions,
suggestions_count=suggestions_count,
today=today)
@bp.route('/api/metrics')
@login_required
def get_metrics():
category_id = request.args.get('category', 'all')
year = int(request.args.get('year', datetime.now().year))
if category_id == 'all':
categories = Category.query.filter_by(user_id=current_user.id).all()
monthly_data = [0] * 12
for cat in categories:
cat_monthly = cat.get_monthly_totals(year)
monthly_data = [monthly_data[i] + cat_monthly[i] for i in range(12)]
pie_data = [cat.get_yearly_total(year) for cat in categories]
pie_labels = [cat.name for cat in categories]
pie_colors = [cat.color for cat in categories]
return jsonify({
'category_name': 'All Categories',
'monthly_data': monthly_data,
'color': '#6366f1',
'pie_data': pie_data,
'pie_labels': pie_labels,
'pie_colors': pie_colors
})
else:
category = Category.query.filter_by(
id=int(category_id),
user_id=current_user.id
).first_or_404()
monthly_data = category.get_monthly_totals(year)
categories = Category.query.filter_by(user_id=current_user.id).all()
pie_data = [cat.get_yearly_total(year) for cat in categories]
pie_labels = [cat.name for cat in categories]
pie_colors = [cat.color for cat in categories]
return jsonify({
'category_name': category.name,
'monthly_data': monthly_data,
'color': category.color,
'pie_data': pie_data,
'pie_labels': pie_labels,
'pie_colors': pie_colors
})
@bp.route('/category/create', methods=['GET', 'POST'])
@login_required
def create_category():
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
color = request.form.get('color', '#6366f1')
if not name:
flash('Category name is required', 'error')
return redirect(url_for('main.create_category'))
category = Category(
name=name,
description=description,
color=color,
user_id=current_user.id
)
db.session.add(category)
db.session.commit()
flash('Category created successfully!', 'success')
return redirect(url_for('main.dashboard'))
return render_template('create_category.html')
@bp.route('/category/<int:category_id>')
@login_required
def view_category(category_id):
category = Category.query.filter_by(id=category_id, user_id=current_user.id).first_or_404()
expenses = Expense.query.filter_by(category_id=category_id, user_id=current_user.id).order_by(Expense.date.desc()).all()
total_spent = category.get_total_spent()
return render_template('view_category.html',
category=category,
expenses=expenses,
total_spent=total_spent)
@bp.route('/category/<int:category_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_category(category_id):
category = Category.query.filter_by(id=category_id, user_id=current_user.id).first_or_404()
if request.method == 'POST':
category.name = request.form.get('name')
category.description = request.form.get('description')
category.color = request.form.get('color')
# Budget settings
monthly_budget = request.form.get('monthly_budget', '').strip()
if monthly_budget:
try:
category.monthly_budget = float(monthly_budget)
if category.monthly_budget < 0:
category.monthly_budget = None
except ValueError:
category.monthly_budget = None
else:
category.monthly_budget = None
# Budget alert threshold (default 100%)
threshold = request.form.get('budget_alert_threshold', '100').strip()
try:
category.budget_alert_threshold = float(threshold) / 100
if category.budget_alert_threshold < 0.5 or category.budget_alert_threshold > 2.0:
category.budget_alert_threshold = 1.0
except ValueError:
category.budget_alert_threshold = 1.0
db.session.commit()
flash('Category updated successfully!', 'success')
return redirect(url_for('main.view_category', category_id=category.id))
return render_template('edit_category.html', category=category)
@bp.route('/category/<int:category_id>/delete', methods=['POST'])
@login_required
def delete_category(category_id):
category = Category.query.filter_by(id=category_id, user_id=current_user.id).first_or_404()
for expense in category.expenses:
if expense.file_path:
file_path = os.path.join(current_app.root_path, 'static', expense.file_path)
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
db.session.delete(category)
db.session.commit()
flash('Category deleted successfully!', 'success')
return redirect(url_for('main.dashboard'))
@bp.route('/expense/create/<int:category_id>', methods=['GET', 'POST'])
@login_required
def create_expense(category_id):
category = Category.query.filter_by(id=category_id, user_id=current_user.id).first_or_404()
user_tags = Tag.query.filter_by(user_id=current_user.id).all()
if request.method == 'POST':
description = request.form.get('description')
amount = request.form.get('amount')
date_str = request.form.get('date')
paid_by = request.form.get('paid_by')
tags = request.form.get('tags')
if not all([description, amount]):
flash('Description and amount are required', 'error')
return redirect(url_for('main.create_expense', category_id=category_id))
try:
amount = float(amount)
if amount <= 0:
raise ValueError()
except ValueError:
flash('Please enter a valid amount', 'error')
return redirect(url_for('main.create_expense', category_id=category_id))
file_path = None
if 'file' in request.files:
file = request.files['file']
if file and file.filename and allowed_file(file.filename):
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{current_user.id}_{timestamp}_{filename}"
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join('uploads', filename)
file.save(os.path.join(current_app.root_path, 'static', file_path))
expense_date = datetime.strptime(date_str, '%Y-%m-%d') if date_str else datetime.utcnow()
expense = Expense(
description=description,
amount=amount,
date=expense_date,
paid_by=paid_by,
tags=tags,
file_path=file_path,
category_id=category_id,
user_id=current_user.id
)
db.session.add(expense)
db.session.commit()
# Check budget after adding expense
from app.budget_alerts import check_budget_alerts
try:
check_budget_alerts()
except Exception as e:
print(f"[Budget Check] Error: {e}")
flash('Expense added successfully!', 'success')
return redirect(url_for('main.view_category', category_id=category_id))
today = datetime.now().strftime('%Y-%m-%d')
return render_template('create_expense.html', category=category, today=today, user_tags=user_tags)
@bp.route('/expense/<int:expense_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_expense(expense_id):
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404()
user_tags = Tag.query.filter_by(user_id=current_user.id).all()
if request.method == 'POST':
expense.description = request.form.get('description')
expense.amount = float(request.form.get('amount'))
expense.date = datetime.strptime(request.form.get('date'), '%Y-%m-%d')
expense.paid_by = request.form.get('paid_by')
expense.tags = request.form.get('tags')
if 'file' in request.files:
file = request.files['file']
if file and file.filename and allowed_file(file.filename):
if expense.file_path:
old_file = os.path.join(current_app.root_path, 'static', expense.file_path)
if os.path.exists(old_file):
try:
os.remove(old_file)
except:
pass
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{current_user.id}_{timestamp}_{filename}"
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join('uploads', filename)
file.save(os.path.join(current_app.root_path, 'static', file_path))
expense.file_path = file_path
db.session.commit()
flash('Expense updated successfully!', 'success')
return redirect(url_for('main.view_category', category_id=expense.category_id))
return render_template('edit_expense.html', expense=expense, user_tags=user_tags)
@bp.route('/expense/<int:expense_id>/delete', methods=['POST'])
@login_required
def delete_expense(expense_id):
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404()
category_id = expense.category_id
if expense.file_path:
file_path = os.path.join(current_app.root_path, 'static', expense.file_path)
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
db.session.delete(expense)
db.session.commit()
flash('Expense deleted successfully!', 'success')
return redirect(url_for('main.view_category', category_id=category_id))
@bp.route('/expense/<int:expense_id>/download')
@login_required
def download_file(expense_id):
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404()
if not expense.file_path:
flash('No file attached to this expense', 'error')
return redirect(url_for('main.view_category', category_id=expense.category_id))
# Use current_app.root_path to get correct path
file_path = os.path.join(current_app.root_path, 'static', expense.file_path)
if not os.path.exists(file_path):
flash('File not found', 'error')
return redirect(url_for('main.view_category', category_id=expense.category_id))
return send_file(file_path, as_attachment=True)
@bp.route('/expense/<int:expense_id>/view')
@login_required
def view_file(expense_id):
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404()
if not expense.file_path:
flash('No file attached to this expense', 'error')
return redirect(url_for('main.view_category', category_id=expense.category_id))
file_path = os.path.join(current_app.root_path, 'static', expense.file_path)
if not os.path.exists(file_path):
flash('File not found', 'error')
return redirect(url_for('main.view_category', category_id=expense.category_id))
# Return file for inline viewing
return send_file(file_path, as_attachment=False)
@bp.route('/api/ocr/process', methods=['POST'])
@login_required
def process_receipt_ocr():
"""
Process uploaded receipt image with OCR
Returns extracted data as JSON
"""
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'No file uploaded'}), 400
file = request.files['file']
if not file or not file.filename:
return jsonify({'success': False, 'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'success': False, 'error': 'Invalid file type'}), 400
try:
from app.ocr import extract_receipt_data, is_valid_receipt_image
# Save temp file
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
temp_filename = f"temp_{current_user.id}_{timestamp}_{filename}"
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
os.makedirs(upload_folder, exist_ok=True)
temp_path = os.path.join(upload_folder, temp_filename)
file.save(temp_path)
# Validate image
is_valid, message = is_valid_receipt_image(temp_path)
if not is_valid:
os.remove(temp_path)
return jsonify({'success': False, 'error': message}), 400
# Extract data with OCR
extracted_data = extract_receipt_data(temp_path)
# Format response
response = {
'success': extracted_data['success'],
'amount': extracted_data['amount'],
'merchant': extracted_data['merchant'],
'confidence': extracted_data['confidence'],
'temp_file': temp_filename
}
if extracted_data['date']:
response['date'] = extracted_data['date'].strftime('%Y-%m-%d')
# Don't delete temp file - will be used if user confirms
return jsonify(response)
except Exception as e:
if os.path.exists(temp_path):
os.remove(temp_path)
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/predictions')
@login_required
def predictions():
"""Display spending predictions dashboard"""
from app.predictions import get_spending_predictions, generate_insights
from datetime import datetime
# Get predictions for next 3 months
predictions_data = get_spending_predictions(current_user.id, months_ahead=3)
# Generate insights
insights = generate_insights(
predictions_data['by_category'],
datetime.now()
)
return render_template('predictions.html',
predictions=predictions_data,
insights=insights)
@bp.route('/api/predictions')
@login_required
def api_predictions():
"""Return JSON predictions for charts"""
from app.predictions import get_spending_predictions
months_ahead = request.args.get('months', 3, type=int)
# Limit to reasonable range
if months_ahead < 1 or months_ahead > 12:
return jsonify({'error': 'months must be between 1 and 12'}), 400
predictions = get_spending_predictions(current_user.id, months_ahead)
return jsonify(predictions)
@bp.route('/api/predictions/category/<int:category_id>')
@login_required
def api_category_forecast(category_id):
"""Get detailed forecast for specific category"""
from app.predictions import get_category_forecast
from app.models.category import Category
# Security check: ensure category belongs to user
category = Category.query.filter_by(
id=category_id,
user_id=current_user.id
).first()
if not category:
return jsonify({'error': 'Category not found'}), 404
forecast = get_category_forecast(category, months=6)
return jsonify({
'category': category.name,
'forecast': forecast
})
@bp.route('/api/search')
@login_required
def api_search():
"""Global search API endpoint"""
from app.search import search_all
query = request.args.get('q', '').strip()
if not query or len(query) < 2:
return jsonify({
'success': False,
'error': 'Query must be at least 2 characters',
'results': {
'expenses': [],
'categories': [],
'subscriptions': [],
'tags': [],
'total': 0
}
})
# Perform search with user isolation
results = search_all(query, current_user.id, limit=50)
return jsonify({
'success': True,
'results': results
})
@bp.route('/api/search/suggestions')
@login_required
def api_search_suggestions():
"""Quick search suggestions for autocomplete"""
from app.search import quick_search_suggestions
query = request.args.get('q', '').strip()
if not query or len(query) < 2:
return jsonify({'suggestions': []})
suggestions = quick_search_suggestions(query, current_user.id, limit=5)
return jsonify({'suggestions': suggestions})
@bp.route('/search')
@login_required
def search_page():
"""Search results page"""
from app.search import search_all
query = request.args.get('q', '').strip()
if not query:
return render_template('search.html', results=None, query='')
results = search_all(query, current_user.id, limit=100)
return render_template('search.html', results=results, query=query)
@bp.route('/bank-import', methods=['GET', 'POST'])
@login_required
def bank_import():
"""Bank statement import page"""
if request.method == 'GET':
# Get user's categories for mapping
categories = Category.query.filter_by(user_id=current_user.id).all()
return render_template('bank_import.html', categories=categories)
# POST: Store uploaded file temporarily and redirect to review
if 'file' not in request.files:
flash('No file uploaded', 'error')
return redirect(url_for('main.bank_import'))
file = request.files['file']
if not file or not file.filename:
flash('No file selected', 'error')
return redirect(url_for('main.bank_import'))
# Save temporarily for processing
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
temp_filename = f"bank_{current_user.id}_{timestamp}_{filename}"
temp_folder = os.path.join(current_app.root_path, 'static', 'uploads', 'temp')
os.makedirs(temp_folder, exist_ok=True)
temp_path = os.path.join(temp_folder, temp_filename)
file.save(temp_path)
# Redirect to parse API then review
return redirect(url_for('main.bank_import_review', filename=temp_filename))
@bp.route('/bank-import/review/<filename>')
@login_required
def bank_import_review(filename):
"""Review parsed transactions before importing"""
from app.bank_import import parse_bank_statement
# Security: Verify filename belongs to current user
if not filename.startswith(f"bank_{current_user.id}_"):
flash('Invalid file', 'error')
return redirect(url_for('main.bank_import'))
temp_folder = os.path.join(current_app.root_path, 'static', 'uploads', 'temp')
temp_path = os.path.join(temp_folder, filename)
if not os.path.exists(temp_path):
flash('File not found. Please upload again.', 'error')
return redirect(url_for('main.bank_import'))
try:
# Read file
with open(temp_path, 'rb') as f:
file_content = f.read()
# Parse bank statement
result = parse_bank_statement(file_content, filename)
if not result['success']:
flash(f"Parsing failed: {result.get('error', 'Unknown error')}", 'error')
# Clean up temp file
try:
os.remove(temp_path)
except:
pass
return redirect(url_for('main.bank_import'))
# Get user's categories
categories = Category.query.filter_by(user_id=current_user.id).all()
# Store temp filename in session for confirmation
from flask import session
session['bank_import_file'] = filename
return render_template('bank_import_review.html',
transactions=result['transactions'],
total_found=result['total_found'],
categories=categories,
bank_format=result.get('bank_format', 'Unknown'),
parse_errors=result.get('parse_errors', []))
except Exception as e:
flash(f'Error processing file: {str(e)}', 'error')
# Clean up temp file
try:
os.remove(temp_path)
except:
pass
return redirect(url_for('main.bank_import'))
@bp.route('/bank-import/confirm', methods=['POST'])
@login_required
def bank_import_confirm():
"""Confirm and import selected transactions"""
from flask import session
# Get temp filename from session
filename = session.get('bank_import_file')
if not filename or not filename.startswith(f"bank_{current_user.id}_"):
flash('Invalid session. Please try again.', 'error')
return redirect(url_for('main.bank_import'))
temp_folder = os.path.join(current_app.root_path, 'static', 'uploads', 'temp')
temp_path = os.path.join(temp_folder, filename)
# Get selected transactions from form
selected_indices = request.form.getlist('selected_transactions')
category_mappings = {} # Map transaction index to category_id
for idx in selected_indices:
category_id = request.form.get(f'category_{idx}')
if category_id:
category_mappings[int(idx)] = int(category_id)
if not selected_indices:
flash('No transactions selected for import', 'warning')
return redirect(url_for('main.bank_import_review', filename=filename))
try:
# Re-parse to get transactions
with open(temp_path, 'rb') as f:
file_content = f.read()
from app.bank_import import parse_bank_statement
result = parse_bank_statement(file_content, filename)
if not result['success']:
raise Exception('Re-parsing failed')
# Import selected transactions
imported_count = 0
skipped_count = 0
for idx_str in selected_indices:
idx = int(idx_str)
if idx >= len(result['transactions']):
continue
trans = result['transactions'][idx]
category_id = category_mappings.get(idx)
if not category_id:
skipped_count += 1
continue
# Check if transaction already exists (deduplication)
existing = Expense.query.filter_by(
user_id=current_user.id,
date=trans['date'],
amount=trans['amount'],
description=trans['description'][:50] # Partial match
).first()
if existing:
skipped_count += 1
continue
# Create expense
expense = Expense(
description=trans['description'],
amount=trans['amount'],
date=datetime.combine(trans['date'], datetime.min.time()),
category_id=category_id,
user_id=current_user.id,
tags='imported, bank-statement'
)
db.session.add(expense)
imported_count += 1
db.session.commit()
# Clean up temp file
try:
os.remove(temp_path)
session.pop('bank_import_file', None)
except:
pass
if imported_count > 0:
flash(f'Successfully imported {imported_count} transactions!', 'success')
if skipped_count > 0:
flash(f'{skipped_count} transactions were skipped (duplicates or no category)', 'info')
else:
flash('No transactions were imported', 'warning')
return redirect(url_for('main.dashboard'))
except Exception as e:
db.session.rollback()
flash(f'Import failed: {str(e)}', 'error')
return redirect(url_for('main.bank_import'))
@bp.route('/api/bank-import/parse', methods=['POST'])
@login_required
def api_bank_import_parse():
"""API endpoint for parsing bank statement (AJAX)"""
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'No file uploaded'}), 400
file = request.files['file']
if not file or not file.filename:
return jsonify({'success': False, 'error': 'No file selected'}), 400
try:
from app.bank_import import parse_bank_statement
# Read file content
file_content = file.read()
filename = secure_filename(file.filename)
# Parse
result = parse_bank_statement(file_content, filename)
if not result['success']:
return jsonify(result), 400
# Convert dates to strings for JSON
for trans in result['transactions']:
trans['date'] = trans['date'].isoformat()
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500