389 lines
13 KiB
Python
389 lines
13 KiB
Python
from flask import Blueprint, request, jsonify, current_app
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
|
from models import db, User
|
|
from werkzeug.security import check_password_hash
|
|
import pyotp
|
|
import qrcode
|
|
import io
|
|
import base64
|
|
import re
|
|
from datetime import datetime
|
|
|
|
auth_bp = Blueprint('auth', __name__, url_prefix='/api/auth')
|
|
login_manager = LoginManager()
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
@login_manager.unauthorized_handler
|
|
def unauthorized():
|
|
return jsonify({'error': 'Unauthorized access'}), 401
|
|
|
|
def validate_email(email):
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
return re.match(pattern, email) is not None
|
|
|
|
def validate_password(password):
|
|
if len(password) < 8:
|
|
return False, "Password must be at least 8 characters long"
|
|
if not re.search(r'[A-Z]', password):
|
|
return False, "Password must contain at least one uppercase letter"
|
|
if not re.search(r'[a-z]', password):
|
|
return False, "Password must contain at least one lowercase letter"
|
|
if not re.search(r'[0-9]', password):
|
|
return False, "Password must contain at least one number"
|
|
return True, "Valid"
|
|
|
|
def send_verification_email(user, token):
|
|
if not current_app.config.get('ENABLE_EMAIL_VERIFICATION'):
|
|
return
|
|
|
|
if not current_app.config.get('MAIL_USERNAME'):
|
|
return
|
|
|
|
try:
|
|
from flask_mail import Message, Mail
|
|
mail = Mail(current_app)
|
|
|
|
msg = Message(
|
|
'Verify your email for Masina-Dock',
|
|
recipients=[user.email]
|
|
)
|
|
msg.body = f'''Hello {user.username},
|
|
|
|
Please verify your email address by clicking the link below:
|
|
|
|
http://localhost:5000/verify-email?token={token}
|
|
|
|
If you did not create an account, please ignore this email.
|
|
|
|
Best regards,
|
|
Masina-Dock Team
|
|
'''
|
|
mail.send(msg)
|
|
except Exception as e:
|
|
print(f"Failed to send email: {e}")
|
|
|
|
@auth_bp.route('/register', methods=['POST'])
|
|
def register():
|
|
try:
|
|
if current_app.config.get('DISABLE_SIGNUPS'):
|
|
return jsonify({'error': 'Registrations are currently disabled'}), 403
|
|
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'error': 'No data provided'}), 400
|
|
|
|
if not data.get('username') or not data.get('email') or not data.get('password'):
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
if User.query.filter_by(username=data['username']).first():
|
|
return jsonify({'error': 'Username already exists'}), 400
|
|
|
|
if User.query.filter_by(email=data['email']).first():
|
|
return jsonify({'error': 'Email already registered'}), 400
|
|
|
|
if not validate_email(data['email']):
|
|
return jsonify({'error': 'Invalid email format'}), 400
|
|
|
|
is_valid, message = validate_password(data['password'])
|
|
if not is_valid:
|
|
return jsonify({'error': message}), 400
|
|
|
|
is_first_user = User.query.count() == 0
|
|
|
|
user = User(
|
|
username=data['username'],
|
|
email=data['email'],
|
|
is_admin=is_first_user,
|
|
language='en',
|
|
unit_system='metric',
|
|
currency='USD',
|
|
email_verified=not current_app.config.get('ENABLE_EMAIL_VERIFICATION', False)
|
|
)
|
|
user.set_password(data['password'])
|
|
|
|
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False):
|
|
token = user.generate_email_verification_token()
|
|
send_verification_email(user, token)
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
message = 'User registered successfully'
|
|
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False):
|
|
message += '. Please check your email to verify your account.'
|
|
|
|
return jsonify({'message': message}), 201
|
|
|
|
except Exception as e:
|
|
print(f"Registration error: {e}")
|
|
db.session.rollback()
|
|
return jsonify({'error': f'Registration failed: {str(e)}'}), 500
|
|
|
|
@auth_bp.route('/verify-email', methods=['POST'])
|
|
def verify_email():
|
|
data = request.get_json()
|
|
token = data.get('token')
|
|
|
|
if not token:
|
|
return jsonify({'error': 'Verification token required'}), 400
|
|
|
|
user = User.query.filter_by(email_verification_token=token).first()
|
|
|
|
if not user:
|
|
return jsonify({'error': 'Invalid or expired verification token'}), 400
|
|
|
|
if user.verify_email(token):
|
|
db.session.commit()
|
|
return jsonify({'message': 'Email verified successfully'}), 200
|
|
|
|
return jsonify({'error': 'Verification failed'}), 400
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
def login():
|
|
try:
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'error': 'No data provided'}), 400
|
|
|
|
if not data.get('username') or not data.get('password'):
|
|
return jsonify({'error': 'Missing username or password'}), 400
|
|
|
|
user = User.query.filter_by(username=data['username']).first()
|
|
|
|
if not user or not user.check_password(data['password']):
|
|
return jsonify({'error': 'Invalid username or password'}), 401
|
|
|
|
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False) and not user.email_verified:
|
|
return jsonify({'error': 'Please verify your email before logging in'}), 403
|
|
|
|
if user.two_factor_enabled:
|
|
return jsonify({
|
|
'requires_2fa': True,
|
|
'user_id': user.id
|
|
}), 200
|
|
|
|
login_user(user, remember=True)
|
|
user.last_login = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'message': 'Login successful',
|
|
'user': {
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'theme': user.theme,
|
|
'first_login': user.first_login,
|
|
'must_change_credentials': user.must_change_credentials,
|
|
'language': user.language,
|
|
'unit_system': user.unit_system,
|
|
'currency': user.currency,
|
|
'photo': user.photo
|
|
}
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
print(f"Login error: {e}")
|
|
return jsonify({'error': f'Login failed: {str(e)}'}), 500
|
|
|
|
@auth_bp.route('/verify-2fa', methods=['POST'])
|
|
def verify_2fa():
|
|
data = request.get_json()
|
|
user_id = data.get('user_id')
|
|
code = data.get('code')
|
|
|
|
if not user_id or not code:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
user = User.query.get(user_id)
|
|
|
|
if not user or not user.two_factor_enabled:
|
|
return jsonify({'error': 'Invalid request'}), 400
|
|
|
|
totp = pyotp.TOTP(user.two_factor_secret)
|
|
|
|
if totp.verify(code) or user.verify_backup_code(code):
|
|
login_user(user, remember=True)
|
|
user.last_login = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'message': 'Login successful',
|
|
'user': {
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'theme': user.theme,
|
|
'first_login': user.first_login,
|
|
'must_change_credentials': user.must_change_credentials,
|
|
'language': user.language,
|
|
'unit_system': user.unit_system,
|
|
'currency': user.currency,
|
|
'photo': user.photo
|
|
}
|
|
}), 200
|
|
|
|
return jsonify({'error': 'Invalid 2FA code'}), 401
|
|
|
|
@auth_bp.route('/setup-2fa', methods=['POST'])
|
|
@login_required
|
|
def setup_2fa():
|
|
if current_user.two_factor_enabled:
|
|
return jsonify({'error': '2FA is already enabled'}), 400
|
|
|
|
secret = pyotp.random_base32()
|
|
current_user.two_factor_secret = secret
|
|
|
|
totp = pyotp.TOTP(secret)
|
|
provisioning_uri = totp.provisioning_uri(
|
|
name=current_user.email,
|
|
issuer_name='Masina-Dock'
|
|
)
|
|
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
|
qr.add_data(provisioning_uri)
|
|
qr.make(fit=True)
|
|
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format='PNG')
|
|
qr_code = base64.b64encode(buffer.getvalue()).decode()
|
|
|
|
backup_codes = current_user.generate_backup_codes()
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'secret': secret,
|
|
'qr_code': f'data:image/png;base64,{qr_code}',
|
|
'backup_codes': backup_codes
|
|
}), 200
|
|
|
|
@auth_bp.route('/enable-2fa', methods=['POST'])
|
|
@login_required
|
|
def enable_2fa():
|
|
data = request.get_json()
|
|
code = data.get('code')
|
|
|
|
if not code:
|
|
return jsonify({'error': 'Verification code required'}), 400
|
|
|
|
if not current_user.two_factor_secret:
|
|
return jsonify({'error': 'Please setup 2FA first'}), 400
|
|
|
|
totp = pyotp.TOTP(current_user.two_factor_secret)
|
|
|
|
if totp.verify(code):
|
|
current_user.two_factor_enabled = True
|
|
db.session.commit()
|
|
return jsonify({'message': '2FA enabled successfully'}), 200
|
|
|
|
return jsonify({'error': 'Invalid verification code'}), 400
|
|
|
|
@auth_bp.route('/disable-2fa', methods=['POST'])
|
|
@login_required
|
|
def disable_2fa():
|
|
data = request.get_json()
|
|
password = data.get('password')
|
|
|
|
if not password:
|
|
return jsonify({'error': 'Password required'}), 400
|
|
|
|
if not current_user.check_password(password):
|
|
return jsonify({'error': 'Invalid password'}), 401
|
|
|
|
current_user.two_factor_enabled = False
|
|
current_user.two_factor_secret = None
|
|
current_user.backup_codes = None
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': '2FA disabled successfully'}), 200
|
|
|
|
@auth_bp.route('/logout', methods=['POST'])
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return jsonify({'message': 'Logout successful'}), 200
|
|
|
|
@auth_bp.route('/me', methods=['GET'])
|
|
@login_required
|
|
def get_current_user():
|
|
return jsonify({
|
|
'id': current_user.id,
|
|
'username': current_user.username,
|
|
'email': current_user.email,
|
|
'theme': current_user.theme,
|
|
'language': current_user.language,
|
|
'unit_system': current_user.unit_system,
|
|
'currency': current_user.currency,
|
|
'photo': current_user.photo,
|
|
'must_change_credentials': current_user.must_change_credentials,
|
|
'email_verified': current_user.email_verified,
|
|
'two_factor_enabled': current_user.two_factor_enabled
|
|
}), 200
|
|
|
|
@auth_bp.route('/update-credentials', methods=['POST'])
|
|
@login_required
|
|
def update_credentials():
|
|
data = request.get_json()
|
|
|
|
new_username = data.get('username')
|
|
new_email = data.get('email')
|
|
new_password = data.get('password')
|
|
|
|
if not new_username or not new_email or not new_password:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
if new_username != current_user.username:
|
|
existing_user = User.query.filter_by(username=new_username).first()
|
|
if existing_user:
|
|
return jsonify({'error': 'Username already taken'}), 400
|
|
current_user.username = new_username
|
|
|
|
if new_email != current_user.email:
|
|
if not validate_email(new_email):
|
|
return jsonify({'error': 'Invalid email format'}), 400
|
|
existing_email = User.query.filter_by(email=new_email).first()
|
|
if existing_email:
|
|
return jsonify({'error': 'Email already registered'}), 400
|
|
current_user.email = new_email
|
|
|
|
if current_app.config.get('ENABLE_EMAIL_VERIFICATION', False):
|
|
current_user.email_verified = False
|
|
token = current_user.generate_email_verification_token()
|
|
send_verification_email(current_user, token)
|
|
|
|
is_valid, message = validate_password(new_password)
|
|
if not is_valid:
|
|
return jsonify({'error': message}), 400
|
|
|
|
current_user.set_password(new_password)
|
|
current_user.must_change_credentials = False
|
|
current_user.first_login = False
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Credentials updated successfully'}), 200
|
|
|
|
@auth_bp.route('/change-password', methods=['POST'])
|
|
@login_required
|
|
def change_password():
|
|
data = request.get_json()
|
|
|
|
if not current_user.check_password(data['current_password']):
|
|
return jsonify({'error': 'Current password is incorrect'}), 400
|
|
|
|
is_valid, message = validate_password(data['new_password'])
|
|
if not is_valid:
|
|
return jsonify({'error': message}), 400
|
|
|
|
current_user.set_password(data['new_password'])
|
|
current_user.first_login = False
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Password changed successfully'}), 200
|