Initial commit: Masina-Dock Vehicle Management System

This commit is contained in:
Iulian 2025-10-19 11:10:11 +01:00
commit ae923e2c41
4999 changed files with 1607266 additions and 0 deletions

389
backend/auth.py Normal file
View file

@ -0,0 +1,389 @@
from flask import Blueprint, request, jsonify, current_app
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from models import db, User
from werkzeug.security import check_password_hash
import pyotp
import qrcode
import io
import base64
import re
from datetime import datetime
auth_bp = Blueprint('auth', __name__, url_prefix='/api/auth')
login_manager = LoginManager()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@login_manager.unauthorized_handler
def unauthorized():
return jsonify({'error': 'Unauthorized access'}), 401
def validate_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_password(password):
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not re.search(r'[A-Z]', password):
return False, "Password must contain at least one uppercase letter"
if not re.search(r'[a-z]', password):
return False, "Password must contain at least one lowercase letter"
if not re.search(r'[0-9]', password):
return False, "Password must contain at least one number"
return True, "Valid"
def send_verification_email(user, token):
if not current_app.config.get('ENABLE_EMAIL_VERIFICATION'):
return
if not current_app.config.get('MAIL_USERNAME'):
return
try:
from flask_mail import Message, Mail
mail = Mail(current_app)
msg = Message(
'Verify your email for Masina-Dock',
recipients=[user.email]
)
msg.body = f'''Hello {user.username},
Please verify your email address by clicking the link below:
http://localhost:5000/verify-email?token={token}
If you did not create an account, please ignore this email.
Best regards,
Masina-Dock Team
'''
mail.send(msg)
except Exception as e:
print(f"Failed to send email: {e}")
@auth_bp.route('/register', methods=['POST'])
def register():
try:
if current_app.config.get('DISABLE_SIGNUPS'):
return jsonify({'error': 'Registrations are currently disabled'}), 403
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
if not data.get('username') or not data.get('email') or not data.get('password'):
return jsonify({'error': 'Missing required fields'}), 400
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already registered'}), 400
if not validate_email(data['email']):
return jsonify({'error': 'Invalid email format'}), 400
is_valid, message = validate_password(data['password'])
if not is_valid:
return jsonify({'error': message}), 400
is_first_user = User.query.count() == 0
user = User(
username=data['username'],
email=data['email'],
is_admin=is_first_user,
language='en',
unit_system='metric',
currency='USD',
email_verified=not current_app.config.get('ENABLE_EMAIL_VERIFICATION', False)
)
user.set_password(data['password'])
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False):
token = user.generate_email_verification_token()
send_verification_email(user, token)
db.session.add(user)
db.session.commit()
message = 'User registered successfully'
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False):
message += '. Please check your email to verify your account.'
return jsonify({'message': message}), 201
except Exception as e:
print(f"Registration error: {e}")
db.session.rollback()
return jsonify({'error': f'Registration failed: {str(e)}'}), 500
@auth_bp.route('/verify-email', methods=['POST'])
def verify_email():
data = request.get_json()
token = data.get('token')
if not token:
return jsonify({'error': 'Verification token required'}), 400
user = User.query.filter_by(email_verification_token=token).first()
if not user:
return jsonify({'error': 'Invalid or expired verification token'}), 400
if user.verify_email(token):
db.session.commit()
return jsonify({'message': 'Email verified successfully'}), 200
return jsonify({'error': 'Verification failed'}), 400
@auth_bp.route('/login', methods=['POST'])
def login():
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
if not data.get('username') or not data.get('password'):
return jsonify({'error': 'Missing username or password'}), 400
user = User.query.filter_by(username=data['username']).first()
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid username or password'}), 401
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False) and not user.email_verified:
return jsonify({'error': 'Please verify your email before logging in'}), 403
if user.two_factor_enabled:
return jsonify({
'requires_2fa': True,
'user_id': user.id
}), 200
login_user(user, remember=True)
user.last_login = datetime.utcnow()
db.session.commit()
return jsonify({
'message': 'Login successful',
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'theme': user.theme,
'first_login': user.first_login,
'must_change_credentials': user.must_change_credentials,
'language': user.language,
'unit_system': user.unit_system,
'currency': user.currency,
'photo': user.photo
}
}), 200
except Exception as e:
print(f"Login error: {e}")
return jsonify({'error': f'Login failed: {str(e)}'}), 500
@auth_bp.route('/verify-2fa', methods=['POST'])
def verify_2fa():
data = request.get_json()
user_id = data.get('user_id')
code = data.get('code')
if not user_id or not code:
return jsonify({'error': 'Missing required fields'}), 400
user = User.query.get(user_id)
if not user or not user.two_factor_enabled:
return jsonify({'error': 'Invalid request'}), 400
totp = pyotp.TOTP(user.two_factor_secret)
if totp.verify(code) or user.verify_backup_code(code):
login_user(user, remember=True)
user.last_login = datetime.utcnow()
db.session.commit()
return jsonify({
'message': 'Login successful',
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'theme': user.theme,
'first_login': user.first_login,
'must_change_credentials': user.must_change_credentials,
'language': user.language,
'unit_system': user.unit_system,
'currency': user.currency,
'photo': user.photo
}
}), 200
return jsonify({'error': 'Invalid 2FA code'}), 401
@auth_bp.route('/setup-2fa', methods=['POST'])
@login_required
def setup_2fa():
if current_user.two_factor_enabled:
return jsonify({'error': '2FA is already enabled'}), 400
secret = pyotp.random_base32()
current_user.two_factor_secret = secret
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=current_user.email,
issuer_name='Masina-Dock'
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format='PNG')
qr_code = base64.b64encode(buffer.getvalue()).decode()
backup_codes = current_user.generate_backup_codes()
db.session.commit()
return jsonify({
'secret': secret,
'qr_code': f'data:image/png;base64,{qr_code}',
'backup_codes': backup_codes
}), 200
@auth_bp.route('/enable-2fa', methods=['POST'])
@login_required
def enable_2fa():
data = request.get_json()
code = data.get('code')
if not code:
return jsonify({'error': 'Verification code required'}), 400
if not current_user.two_factor_secret:
return jsonify({'error': 'Please setup 2FA first'}), 400
totp = pyotp.TOTP(current_user.two_factor_secret)
if totp.verify(code):
current_user.two_factor_enabled = True
db.session.commit()
return jsonify({'message': '2FA enabled successfully'}), 200
return jsonify({'error': 'Invalid verification code'}), 400
@auth_bp.route('/disable-2fa', methods=['POST'])
@login_required
def disable_2fa():
data = request.get_json()
password = data.get('password')
if not password:
return jsonify({'error': 'Password required'}), 400
if not current_user.check_password(password):
return jsonify({'error': 'Invalid password'}), 401
current_user.two_factor_enabled = False
current_user.two_factor_secret = None
current_user.backup_codes = None
db.session.commit()
return jsonify({'message': '2FA disabled successfully'}), 200
@auth_bp.route('/logout', methods=['POST'])
@login_required
def logout():
logout_user()
return jsonify({'message': 'Logout successful'}), 200
@auth_bp.route('/me', methods=['GET'])
@login_required
def get_current_user():
return jsonify({
'id': current_user.id,
'username': current_user.username,
'email': current_user.email,
'theme': current_user.theme,
'language': current_user.language,
'unit_system': current_user.unit_system,
'currency': current_user.currency,
'photo': current_user.photo,
'must_change_credentials': current_user.must_change_credentials,
'email_verified': current_user.email_verified,
'two_factor_enabled': current_user.two_factor_enabled
}), 200
@auth_bp.route('/update-credentials', methods=['POST'])
@login_required
def update_credentials():
data = request.get_json()
new_username = data.get('username')
new_email = data.get('email')
new_password = data.get('password')
if not new_username or not new_email or not new_password:
return jsonify({'error': 'Missing required fields'}), 400
if new_username != current_user.username:
existing_user = User.query.filter_by(username=new_username).first()
if existing_user:
return jsonify({'error': 'Username already taken'}), 400
current_user.username = new_username
if new_email != current_user.email:
if not validate_email(new_email):
return jsonify({'error': 'Invalid email format'}), 400
existing_email = User.query.filter_by(email=new_email).first()
if existing_email:
return jsonify({'error': 'Email already registered'}), 400
current_user.email = new_email
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False):
current_user.email_verified = False
token = current_user.generate_email_verification_token()
send_verification_email(current_user, token)
is_valid, message = validate_password(new_password)
if not is_valid:
return jsonify({'error': message}), 400
current_user.set_password(new_password)
current_user.must_change_credentials = False
current_user.first_login = False
db.session.commit()
return jsonify({'message': 'Credentials updated successfully'}), 200
@auth_bp.route('/change-password', methods=['POST'])
@login_required
def change_password():
data = request.get_json()
if not current_user.check_password(data['current_password']):
return jsonify({'error': 'Current password is incorrect'}), 400
is_valid, message = validate_password(data['new_password'])
if not is_valid:
return jsonify({'error': message}), 400
current_user.set_password(data['new_password'])
current_user.first_login = False
db.session.commit()
return jsonify({'message': 'Password changed successfully'}), 200