Initial commit - SoundWave v1.0

- Full PWA support with offline capabilities
- Comprehensive search across songs, playlists, and channels
- Offline playlist manager with download tracking
- Pre-built frontend for zero-build deployment
- Docker-based deployment with docker compose
- Material-UI dark theme interface
- YouTube audio download and management
- Multi-user authentication support
This commit is contained in:
Iulian 2025-12-16 23:43:07 +00:00
commit 51679d1943
254 changed files with 37281 additions and 0 deletions

View file

@ -0,0 +1,464 @@
# Multi-Tenant Admin System - Implementation Guide
## Overview
This system transforms SoundWave into a multi-tenant platform where:
- **Admins** can manage all users and their content
- **Users** have isolated YouTube accounts, channels, playlists, and audio files
- Each user operates as if they have their own Docker container
- Resource limits (storage, channels, playlists) are enforced per user
## Architecture
### User Isolation Model
```
Admin User (is_admin=True)
├── Can create/manage all users
├── Access all content across users
└── Set resource quotas
Regular User
├── Own YouTube accounts
├── Own channels (subscriptions)
├── Own playlists
├── Own audio files
└── Cannot see other users' data
```
### Database Schema Changes
**Account Model** (`user/models.py`):
```python
- storage_quota_gb: int (default 50 GB)
- storage_used_gb: float (tracked automatically)
- max_channels: int (default 50)
- max_playlists: int (default 100)
- user_notes: text (admin notes)
- created_by: ForeignKey to admin who created user
```
**UserYouTubeAccount Model** (NEW):
```python
- user: ForeignKey to Account
- account_name: str (friendly name)
- youtube_channel_id: str
- youtube_channel_name: str
- cookies_file: text (for authentication)
- auto_download: bool
- download_quality: choices
```
**Channel Model** (UPDATED):
```python
+ owner: ForeignKey to Account
+ youtube_account: ForeignKey to UserYouTubeAccount
+ auto_download: bool per channel
+ download_quality: choices per channel
```
**Audio Model** (UPDATED):
```python
+ owner: ForeignKey to Account
```
**Playlist Model** (UPDATED):
```python
+ owner: ForeignKey to Account
+ auto_download: bool per playlist
```
### Unique Constraints
- **Channel**: `(owner, channel_id)` - Each user can subscribe once per channel
- **Audio**: `(owner, youtube_id)` - Each user can have one copy of each video
- **Playlist**: `(owner, playlist_id)` - Each user can subscribe once per playlist
## Backend Implementation
### Middleware (`config/middleware.py`)
**UserIsolationMiddleware**:
- Adds `request.filter_by_user()` helper
- Automatically filters querysets by owner
- Admins bypass filtering
**StorageQuotaMiddleware**:
- Tracks storage usage
- Prevents uploads when quota exceeded
### Permissions (`common/permissions.py`)
**IsOwnerOrAdmin**:
- Users can only access their own objects
- Admins can access everything
**CanManageUsers**:
- Only admins can manage users
**WithinQuotaLimits**:
- Checks storage/channel/playlist quotas
- Admins bypass quota checks
### Admin API (`user/views_admin.py`)
**UserManagementViewSet**:
```python
GET /api/user/admin/users/ # List users
POST /api/user/admin/users/ # Create user
GET /api/user/admin/users/{id}/ # User details
PATCH /api/user/admin/users/{id}/ # Update user
GET /api/user/admin/users/{id}/stats/ # User statistics
POST /api/user/admin/users/{id}/reset_storage/
POST /api/user/admin/users/{id}/reset_2fa/
POST /api/user/admin/users/{id}/toggle_active/
GET /api/user/admin/users/{id}/channels/
GET /api/user/admin/users/{id}/playlists/
GET /api/user/admin/users/system_stats/ # System-wide stats
```
**UserYouTubeAccountViewSet**:
```python
GET /api/user/admin/youtube-accounts/ # List accounts
POST /api/user/admin/youtube-accounts/ # Add account
GET /api/user/admin/youtube-accounts/{id}/ # Account details
PATCH /api/user/admin/youtube-accounts/{id}/ # Update account
DELETE /api/user/admin/youtube-accounts/{id}/ # Delete account
POST /api/user/admin/youtube-accounts/{id}/verify/ # Verify credentials
POST /api/user/admin/youtube-accounts/{id}/toggle_active/
```
### Django Admin (`user/admin_users.py`)
Enhanced admin interface with:
- User list with storage/channel/playlist counts
- Visual storage progress bars
- Bulk actions (reset storage, disable users, reset 2FA)
- YouTube account management
- Per-user notes
## Frontend Implementation
### AdminUsersPage Component
**Features**:
- System statistics dashboard (users, content, storage)
- Users table with status, storage, content counts
- Create user dialog with full settings
- Edit user dialog with quota management
- User details modal with comprehensive info
- Quick actions (activate/deactivate, reset storage, reset 2FA)
**UI Components**:
```tsx
- System stats cards (users, content, storage)
- Users table (sortable, filterable)
- Create user form (username, email, password, quotas)
- Edit user form (quotas, status, permissions)
- User details modal (all stats and metadata)
- Actions menu (edit, toggle, reset)
```
## Migration Strategy
### Step 1: Run Migrations
```bash
# Create migrations
python manage.py makemigrations user channel audio playlist
# Apply migrations
python manage.py migrate
# Create superuser
python manage.py createsuperuser
```
### Step 2: Data Migration
For existing data, create a data migration to set owner fields:
```python
# Create empty migration
python manage.py makemigrations --empty user --name set_default_owner
# Edit migration file
def set_default_owner(apps, schema_editor):
Account = apps.get_model('user', 'Account')
Channel = apps.get_model('channel', 'Channel')
Audio = apps.get_model('audio', 'Audio')
Playlist = apps.get_model('playlist', 'Playlist')
# Get or create default admin user
admin = Account.objects.filter(is_superuser=True).first()
if not admin:
admin = Account.objects.create_superuser(
username='admin',
email='admin@example.com',
password='changeme'
)
# Assign owner to existing records
Channel.objects.filter(owner__isnull=True).update(owner=admin)
Audio.objects.filter(owner__isnull=True).update(owner=admin)
Playlist.objects.filter(owner__isnull=True).update(owner=admin)
```
### Step 3: Update Views
Update existing views to use owner filtering:
```python
# Before
Audio.objects.all()
# After
Audio.objects.filter(owner=request.user)
# or use middleware
request.filter_by_user(Audio.objects.all())
```
### Step 4: Update Serializers
Ensure owner is set on create:
```python
def perform_create(self, serializer):
serializer.save(owner=self.request.user)
```
## Usage Examples
### Admin Creating User
```bash
POST /api/user/admin/users/
{
"username": "john_doe",
"email": "john@example.com",
"password": "secure123",
"password_confirm": "secure123",
"storage_quota_gb": 100,
"max_channels": 75,
"max_playlists": 150,
"is_admin": false,
"is_active": true,
"user_notes": "Premium user - increased quotas"
}
```
### User Adding YouTube Account
```bash
POST /api/user/admin/youtube-accounts/
{
"account_name": "Personal YouTube",
"youtube_channel_id": "UCxxxxxxxx",
"youtube_channel_name": "John's Channel",
"cookies_file": "# Netscape HTTP Cookie File...",
"auto_download": true,
"download_quality": "high"
}
```
### User Subscribing to Channel
```bash
POST /api/channels/
{
"channel_id": "UCxxxxxxxx",
"channel_name": "Tech Channel",
"youtube_account": 1, # User's YouTube account ID
"subscribed": true,
"auto_download": true,
"download_quality": "auto"
}
```
## Resource Quota Enforcement
### Storage Quota
```python
# Checked before download
if user.storage_used_gb >= user.storage_quota_gb:
raise PermissionDenied("Storage quota exceeded")
# Updated after download
file_size_gb = file_size_bytes / (1024**3)
user.storage_used_gb += file_size_gb
user.save()
# Updated after deletion
user.storage_used_gb -= file_size_gb
user.save()
```
### Channel Limit
```python
# Checked before subscribing
if not user.can_add_channel:
raise PermissionDenied(f"Channel limit reached ({user.max_channels})")
# Property in Account model
@property
def can_add_channel(self):
current_count = self.channels.count()
return current_count < self.max_channels
```
### Playlist Limit
```python
# Checked before creating
if not user.can_add_playlist:
raise PermissionDenied(f"Playlist limit reached ({user.max_playlists})")
# Property in Account model
@property
def can_add_playlist(self):
current_count = self.playlists.count()
return current_count < self.max_playlists
```
## Security Considerations
### Data Isolation
1. **Queryset Filtering**: All queries automatically filtered by owner
2. **Middleware**: UserIsolationMiddleware enforces filtering
3. **Permissions**: IsOwnerOrAdmin checks object-level permissions
4. **Admin Bypass**: Admins can access all data for management
### Authentication
1. **User Authentication**: Standard Django auth with 2FA support
2. **YouTube Authentication**: Cookie-based (stored per user)
3. **API Authentication**: Token-based with user context
### File Storage
User files should be stored in isolated directories:
```python
# File path structure
/media/
└── users/
├── user_1/
│ ├── audio/
│ ├── thumbnails/
│ └── cookies/
├── user_2/
│ ├── audio/
│ ├── thumbnails/
│ └── cookies/
└── ...
```
## Celery Tasks
Update tasks to respect user isolation:
```python
@shared_task
def download_audio(audio_id, user_id):
audio = Audio.objects.get(id=audio_id, owner_id=user_id)
user = audio.owner
# Use user's YouTube account
youtube_account = audio.channel.youtube_account
cookies_file = youtube_account.cookies_file if youtube_account else None
# Download to user's directory
output_path = f'/media/users/user_{user_id}/audio/'
# Check quota before download
if user.storage_used_gb >= user.storage_quota_gb:
raise Exception("Storage quota exceeded")
# Download...
# Update storage
user.storage_used_gb += file_size_gb
user.save()
```
## Testing
### Test User Isolation
```python
def test_user_cannot_access_other_user_data():
user1 = Account.objects.create_user('user1', 'user1@test.com', 'pass')
user2 = Account.objects.create_user('user2', 'user2@test.com', 'pass')
audio1 = Audio.objects.create(owner=user1, youtube_id='xxx')
audio2 = Audio.objects.create(owner=user2, youtube_id='yyy')
# User1 should only see their audio
assert Audio.objects.filter(owner=user1).count() == 1
assert Audio.objects.filter(owner=user2).count() == 1
```
### Test Quota Enforcement
```python
def test_storage_quota_enforced():
user = Account.objects.create_user(
'user', 'user@test.com', 'pass',
storage_quota_gb=10,
storage_used_gb=10
)
# Should fail when quota exceeded
with pytest.raises(PermissionDenied):
download_audio(audio_id, user.id)
```
## Performance Optimization
### Database Indexes
```python
class Meta:
indexes = [
models.Index(fields=['owner', 'youtube_id']),
models.Index(fields=['owner', 'channel_id']),
models.Index(fields=['owner', '-published_date']),
]
```
### Query Optimization
```python
# Use select_related for foreign keys
Audio.objects.filter(owner=user).select_related('owner')
# Use prefetch_related for reverse relations
User.objects.prefetch_related('channels', 'playlists', 'audio_files')
```
### Caching
```python
# Cache user stats
cache_key = f'user_stats_{user.id}'
stats = cache.get(cache_key)
if not stats:
stats = calculate_user_stats(user)
cache.set(cache_key, stats, 300) # 5 minutes
```
## Future Enhancements
- [ ] User groups and team accounts
- [ ] Shared playlists between users
- [ ] Storage pooling for organizations
- [ ] Usage analytics per user
- [ ] API rate limiting per user
- [ ] Custom branding per user
- [ ] Billing and subscription management
- [ ] OAuth integration for YouTube
- [ ] Automated quota adjustment based on usage
- [ ] User data export/import

View file

@ -0,0 +1,239 @@
# User Registration Policy
## Public Registration Status: DISABLED ❌
Public user registration is **disabled** in SoundWave. This is a security feature for multi-tenant deployments.
## User Creation
### Admin-Only User Creation
Only administrators can create new user accounts through:
1. **Django Admin Panel**:
```
http://localhost:8888/admin/user/account/add/
```
2. **REST API** (Admin only):
```bash
POST /api/user/admin/users/
{
"username": "newuser",
"email": "user@example.com",
"password": "SecurePass123",
"password_confirm": "SecurePass123",
"storage_quota_gb": 50,
"max_channels": 50,
"max_playlists": 100,
"is_admin": false,
"is_active": true
}
```
3. **Frontend Admin Panel**:
- Navigate to Admin Users page
- Click "Create User" button
- Fill in user details and resource quotas
### Django Management Command
Admins can also use Django management commands:
```bash
# Create regular user
python manage.py createsuperuser
# Or use shell
python manage.py shell
>>> from user.models import Account
>>> user = Account.objects.create_user(
... username='john_doe',
... email='john@example.com',
... password='SecurePass123'
... )
>>> user.storage_quota_gb = 100
>>> user.max_channels = 75
>>> user.save()
```
## Attempted Public Registration
If someone attempts to access the registration endpoint:
**Request**:
```bash
POST /api/user/register/
{
"username": "newuser",
"email": "user@example.com",
"password": "password123"
}
```
**Response** (403 Forbidden):
```json
{
"error": "Public registration is disabled",
"message": "New users can only be created by administrators. Please contact your system administrator for account creation."
}
```
## Configuration
Registration policy is controlled in `config/user_settings.py`:
```python
# Public registration disabled - only admins can create users
ALLOW_PUBLIC_REGISTRATION = False
```
### To Enable Public Registration (Not Recommended)
If you need to enable public registration for testing or specific use cases:
1. Edit `config/user_settings.py`:
```python
ALLOW_PUBLIC_REGISTRATION = True
```
2. Implement registration logic in `user/views.py` RegisterView
3. Add frontend registration form (not included by default)
**⚠️ Warning**: Enabling public registration bypasses the multi-tenant security model and allows anyone to create accounts.
## Security Benefits
### Why Registration is Disabled
1. **Resource Control**: Admins control who gets accounts and resource quotas
2. **Quality Control**: Prevents spam accounts and abuse
3. **Multi-Tenancy**: Each user is a "tenant" with isolated data
4. **Storage Management**: Admins allocate storage based on needs
5. **Compliance**: Controlled user base for compliance requirements
6. **Billing**: Users can be tied to billing/subscription models
### Admin Capabilities
Admins have full control over:
- User creation and deletion
- Resource quotas (storage, channels, playlists)
- Account activation/deactivation
- 2FA reset
- Storage usage monitoring
- User permissions (admin/regular)
## User Onboarding Flow
### Recommended Process
1. **Request**: User requests account via email/form
2. **Admin Review**: Admin reviews request
3. **Account Creation**: Admin creates account with appropriate quotas
4. **Credentials**: Admin sends credentials to user securely
5. **First Login**: User logs in and changes password
6. **2FA Setup**: User sets up 2FA (recommended)
### Example Onboarding Email
```
Welcome to SoundWave!
Your account has been created:
- Username: john_doe
- Temporary Password: [generated_password]
Storage Quota: 50 GB
Max Channels: 50
Max Playlists: 100
Please login and change your password immediately:
http://soundwave.example.com/
For security, we recommend enabling 2FA in Settings.
Questions? Contact: admin@example.com
```
## API Endpoints
### Public Endpoints (No Auth Required)
- `POST /api/user/login/` - User login
- `POST /api/user/register/` - Returns 403 (disabled)
### Authenticated Endpoints
- `GET /api/user/account/` - Get current user
- `POST /api/user/logout/` - Logout
- `GET /api/user/config/` - User settings
### Admin-Only Endpoints
- `GET /api/user/admin/users/` - List all users
- `POST /api/user/admin/users/` - Create new user
- `PATCH /api/user/admin/users/{id}/` - Update user
- `POST /api/user/admin/users/{id}/reset_storage/` - Reset storage
- `POST /api/user/admin/users/{id}/toggle_active/` - Activate/deactivate
## Password Requirements
When creating users, passwords must meet these requirements:
```python
PASSWORD_MIN_LENGTH = 8
PASSWORD_REQUIRE_UPPERCASE = True
PASSWORD_REQUIRE_LOWERCASE = True
PASSWORD_REQUIRE_NUMBERS = True
PASSWORD_REQUIRE_SPECIAL = False # Optional
```
Example valid passwords:
- `SecurePass123`
- `MyPassword1`
- `Admin2024Test`
## Future Enhancements
Potential features for user management:
- [ ] Invitation system (admin sends invite links)
- [ ] Approval workflow (users request, admin approves)
- [ ] Self-service password reset
- [ ] Email verification
- [ ] Account expiration dates
- [ ] Welcome email templates
- [ ] User onboarding wizard
- [ ] Bulk user import from CSV
- [ ] SSO/LDAP integration
- [ ] OAuth providers (Google, GitHub)
## Troubleshooting
### "Registration is disabled" error
**Cause**: Public registration is intentionally disabled.
**Solution**: Contact system administrator to create an account.
### Cannot create users
**Cause**: User is not an admin.
**Solution**: Only admin users (`is_admin=True` or `is_superuser=True`) can create users.
### How to create first admin?
```bash
python manage.py createsuperuser
```
This creates the first admin who can then create other users.
## Best Practices
1. **Strong Passwords**: Enforce strong password requirements
2. **Enable 2FA**: Require 2FA for admin accounts
3. **Audit Logs**: Track user creation and modifications
4. **Resource Planning**: Allocate quotas based on user needs
5. **Regular Review**: Periodically review active users
6. **Offboarding**: Deactivate accounts for departed users
7. **Backup**: Regular database backups including user data
8. **Documentation**: Keep user list and quotas documented

0
backend/user/__init__.py Normal file
View file

5
backend/user/admin.py Normal file
View file

@ -0,0 +1,5 @@
"""User admin - Import enhanced admin from admin_users"""
from user.admin_users import AccountAdmin, UserYouTubeAccountAdmin
# Admin classes are registered in admin_users.py

243
backend/user/admin_users.py Normal file
View file

@ -0,0 +1,243 @@
"""Admin interface for user management"""
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.utils.html import format_html
from django.urls import reverse
from django.utils.safestring import mark_safe
from user.models import Account, UserYouTubeAccount
@admin.register(Account)
class AccountAdmin(BaseUserAdmin):
"""Enhanced admin for Account model with user management"""
list_display = [
'username',
'email',
'is_admin',
'is_active',
'storage_usage',
'channel_count',
'playlist_count',
'date_joined',
'last_login',
]
list_filter = [
'is_admin',
'is_active',
'is_staff',
'is_superuser',
'two_factor_enabled',
'date_joined',
]
search_fields = ['username', 'email']
fieldsets = (
('Account Info', {
'fields': ('username', 'email', 'password')
}),
('Permissions', {
'fields': (
'is_active',
'is_staff',
'is_admin',
'is_superuser',
'groups',
'user_permissions',
)
}),
('Resource Limits', {
'fields': (
'storage_quota_gb',
'storage_used_gb',
'max_channels',
'max_playlists',
)
}),
('Security', {
'fields': (
'two_factor_enabled',
'two_factor_secret',
)
}),
('Metadata', {
'fields': (
'user_notes',
'created_by',
'date_joined',
'last_login',
)
}),
)
add_fieldsets = (
('Create New User', {
'classes': ('wide',),
'fields': (
'username',
'email',
'password1',
'password2',
'is_admin',
'is_active',
'storage_quota_gb',
'max_channels',
'max_playlists',
'user_notes',
),
}),
)
readonly_fields = ['date_joined', 'last_login', 'storage_used_gb']
ordering = ['-date_joined']
def storage_usage(self, obj):
"""Display storage usage with progress bar"""
percent = obj.storage_percent_used
if percent > 90:
color = 'red'
elif percent > 75:
color = 'orange'
else:
color = 'green'
return format_html(
'<div style="width:100px; background-color:#f0f0f0; border:1px solid #ccc;">'
'<div style="width:{}%; background-color:{}; height:20px; text-align:center; color:white;">'
'{:.1f}%'
'</div></div>',
min(percent, 100),
color,
percent
)
storage_usage.short_description = 'Storage'
def channel_count(self, obj):
"""Display channel count with limit"""
from channel.models import Channel
count = Channel.objects.filter(owner=obj).count()
return format_html(
'<span style="color: {};">{} / {}</span>',
'red' if count >= obj.max_channels else 'green',
count,
obj.max_channels
)
channel_count.short_description = 'Channels'
def playlist_count(self, obj):
"""Display playlist count with limit"""
from playlist.models import Playlist
count = Playlist.objects.filter(owner=obj).count()
return format_html(
'<span style="color: {};">{} / {}</span>',
'red' if count >= obj.max_playlists else 'green',
count,
obj.max_playlists
)
playlist_count.short_description = 'Playlists'
def save_model(self, request, obj, form, change):
"""Set created_by for new users"""
if not change and request.user.is_authenticated:
obj.created_by = request.user
super().save_model(request, obj, form, change)
actions = [
'reset_storage_quota',
'disable_users',
'enable_users',
'reset_2fa',
]
def reset_storage_quota(self, request, queryset):
"""Reset storage usage to 0"""
count = queryset.update(storage_used_gb=0.0)
self.message_user(request, f'Reset storage for {count} users')
reset_storage_quota.short_description = 'Reset storage usage'
def disable_users(self, request, queryset):
"""Disable selected users"""
count = queryset.update(is_active=False)
self.message_user(request, f'Disabled {count} users')
disable_users.short_description = 'Disable selected users'
def enable_users(self, request, queryset):
"""Enable selected users"""
count = queryset.update(is_active=True)
self.message_user(request, f'Enabled {count} users')
enable_users.short_description = 'Enable selected users'
def reset_2fa(self, request, queryset):
"""Reset 2FA for selected users"""
count = queryset.update(
two_factor_enabled=False,
two_factor_secret='',
backup_codes=[]
)
self.message_user(request, f'Reset 2FA for {count} users')
reset_2fa.short_description = 'Reset 2FA'
@admin.register(UserYouTubeAccount)
class UserYouTubeAccountAdmin(admin.ModelAdmin):
"""Admin for YouTube accounts"""
list_display = [
'user',
'account_name',
'youtube_channel_name',
'is_active',
'auto_download',
'download_quality',
'created_date',
]
list_filter = [
'is_active',
'auto_download',
'download_quality',
'created_date',
]
search_fields = [
'user__username',
'account_name',
'youtube_channel_name',
'youtube_channel_id',
]
fieldsets = (
('Account Info', {
'fields': (
'user',
'account_name',
'youtube_channel_id',
'youtube_channel_name',
)
}),
('Authentication', {
'fields': (
'cookies_file',
'is_active',
'last_verified',
)
}),
('Download Settings', {
'fields': (
'auto_download',
'download_quality',
)
}),
)
readonly_fields = ['created_date', 'last_verified']
def get_queryset(self, request):
"""Filter by user if not admin"""
qs = super().get_queryset(request)
if request.user.is_superuser or request.user.is_admin:
return qs
return qs.filter(user=request.user)

View file

152
backend/user/models.py Normal file
View file

@ -0,0 +1,152 @@
"""User models"""
from django.contrib.auth.models import AbstractUser, BaseUserManager
from django.db import models
class AccountManager(BaseUserManager):
"""Custom user manager"""
def create_user(self, username, email, password=None):
"""Create regular user"""
if not email:
raise ValueError('Users must have an email address')
if not username:
raise ValueError('Users must have a username')
user = self.model(
email=self.normalize_email(email),
username=username,
)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, username, email, password):
"""Create superuser"""
user = self.create_user(
email=self.normalize_email(email),
password=password,
username=username,
)
user.is_admin = True
user.is_staff = True
user.is_superuser = True
user.save(using=self._db)
return user
class Account(AbstractUser):
"""Custom user model"""
email = models.EmailField(verbose_name="email", max_length=60, unique=True)
username = models.CharField(max_length=30, unique=True)
date_joined = models.DateTimeField(verbose_name='date joined', auto_now_add=True)
last_login = models.DateTimeField(verbose_name='last login', auto_now=True)
is_admin = models.BooleanField(default=False)
is_active = models.BooleanField(default=True)
is_staff = models.BooleanField(default=False)
is_superuser = models.BooleanField(default=False)
# 2FA fields
two_factor_enabled = models.BooleanField(default=False)
two_factor_secret = models.CharField(max_length=32, blank=True, null=True)
backup_codes = models.JSONField(default=list, blank=True)
# User isolation and resource limits
storage_quota_gb = models.IntegerField(default=50, help_text="Storage quota in GB")
storage_used_gb = models.FloatField(default=0.0, help_text="Storage used in GB")
max_channels = models.IntegerField(default=50, help_text="Maximum channels allowed")
max_playlists = models.IntegerField(default=100, help_text="Maximum playlists allowed")
# User metadata
user_notes = models.TextField(blank=True, help_text="Admin notes about this user")
created_by = models.ForeignKey(
'self',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_users',
help_text="Admin who created this user"
)
avatar = models.CharField(
max_length=500,
blank=True,
null=True,
help_text="Path to user avatar image or preset avatar number (1-5)"
)
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['email']
objects = AccountManager()
def __str__(self):
return self.username
@property
def storage_percent_used(self):
"""Calculate storage usage percentage"""
if self.storage_quota_gb == 0:
return 0
return (self.storage_used_gb / self.storage_quota_gb) * 100
@property
def can_add_channel(self):
"""Check if user can add more channels"""
from channel.models import Channel
current_count = Channel.objects.filter(owner=self).count()
return current_count < self.max_channels
@property
def can_add_playlist(self):
"""Check if user can add more playlists"""
from playlist.models import Playlist
current_count = Playlist.objects.filter(owner=self).count()
return current_count < self.max_playlists
def calculate_storage_usage(self):
"""Calculate and update actual storage usage from audio files"""
from audio.models import Audio
from django.db.models import Sum
total_bytes = Audio.objects.filter(owner=self).aggregate(
total=Sum('file_size')
)['total'] or 0
# Convert bytes to GB
self.storage_used_gb = round(total_bytes / (1024 ** 3), 2)
self.save(update_fields=['storage_used_gb'])
return self.storage_used_gb
class UserYouTubeAccount(models.Model):
"""User's YouTube account credentials and settings"""
user = models.ForeignKey(Account, on_delete=models.CASCADE, related_name='youtube_accounts')
account_name = models.CharField(max_length=200, help_text="Friendly name for this YouTube account")
# YouTube authentication (for future OAuth integration)
youtube_channel_id = models.CharField(max_length=50, blank=True)
youtube_channel_name = models.CharField(max_length=200, blank=True)
# Cookie-based authentication (current method)
cookies_file = models.TextField(blank=True, help_text="YouTube cookies for authenticated downloads")
# Account status
is_active = models.BooleanField(default=True)
last_verified = models.DateTimeField(null=True, blank=True)
created_date = models.DateTimeField(auto_now_add=True)
# Download preferences
auto_download = models.BooleanField(default=True, help_text="Automatically download new videos")
download_quality = models.CharField(
max_length=20,
default='medium',
choices=[('low', 'Low'), ('medium', 'Medium'), ('high', 'High'), ('ultra', 'Ultra')]
)
class Meta:
ordering = ['-created_date']
unique_together = ('user', 'account_name')
def __str__(self):
return f"{self.user.username} - {self.account_name}"

View file

@ -0,0 +1,71 @@
"""User serializers"""
from rest_framework import serializers
from user.models import Account
class AccountSerializer(serializers.ModelSerializer):
"""Account serializer"""
avatar_url = serializers.SerializerMethodField()
class Meta:
model = Account
fields = [
'id', 'username', 'email', 'date_joined', 'last_login',
'two_factor_enabled', 'avatar', 'avatar_url',
'is_admin', 'is_superuser', 'is_staff',
'storage_quota_gb', 'storage_used_gb',
'max_channels', 'max_playlists'
]
read_only_fields = [
'id', 'date_joined', 'last_login', 'two_factor_enabled', 'avatar_url',
'is_admin', 'is_superuser', 'is_staff',
'storage_used_gb'
]
def get_avatar_url(self, obj):
"""Get avatar URL"""
if not obj.avatar:
return None
# Preset avatars (served from frontend public folder)
if obj.avatar.startswith('preset_'):
return f"/avatars/{obj.avatar}.svg"
# Custom avatars (served from backend)
return f"/api/user/avatar/file/{obj.avatar.split('/')[-1]}/"
class LoginSerializer(serializers.Serializer):
"""Login serializer"""
username = serializers.CharField()
password = serializers.CharField(write_only=True)
two_factor_code = serializers.CharField(required=False, allow_blank=True)
class UserConfigSerializer(serializers.Serializer):
"""User configuration serializer"""
theme = serializers.CharField(default='dark')
items_per_page = serializers.IntegerField(default=50)
audio_quality = serializers.ChoiceField(
choices=['low', 'medium', 'high', 'best'],
default='best'
)
class TwoFactorSetupSerializer(serializers.Serializer):
"""2FA setup response"""
secret = serializers.CharField()
qr_code = serializers.CharField()
backup_codes = serializers.ListField(child=serializers.CharField())
class TwoFactorVerifySerializer(serializers.Serializer):
"""2FA verification"""
code = serializers.CharField(min_length=6, max_length=6)
class TwoFactorStatusSerializer(serializers.Serializer):
"""2FA status"""
enabled = serializers.BooleanField()
backup_codes_count = serializers.IntegerField()

View file

@ -0,0 +1,181 @@
"""Serializers for admin user management"""
from rest_framework import serializers
from user.models import Account, UserYouTubeAccount
from channel.models import Channel
from playlist.models import Playlist
class UserStatsSerializer(serializers.Serializer):
"""User statistics"""
total_channels = serializers.IntegerField()
total_playlists = serializers.IntegerField()
total_audio_files = serializers.IntegerField()
storage_used_gb = serializers.FloatField()
storage_quota_gb = serializers.IntegerField()
storage_percent = serializers.FloatField()
class UserDetailSerializer(serializers.ModelSerializer):
"""Detailed user information for admin"""
storage_percent_used = serializers.FloatField(read_only=True)
can_add_channel = serializers.BooleanField(read_only=True)
can_add_playlist = serializers.BooleanField(read_only=True)
stats = serializers.SerializerMethodField()
created_by_username = serializers.CharField(source='created_by.username', read_only=True)
class Meta:
model = Account
fields = [
'id',
'username',
'email',
'is_admin',
'is_active',
'is_staff',
'is_superuser',
'two_factor_enabled',
'storage_quota_gb',
'storage_used_gb',
'storage_percent_used',
'max_channels',
'max_playlists',
'can_add_channel',
'can_add_playlist',
'user_notes',
'created_by',
'created_by_username',
'date_joined',
'last_login',
'stats',
]
read_only_fields = [
'id',
'date_joined',
'last_login',
'storage_used_gb',
'two_factor_enabled',
]
def get_stats(self, obj):
"""Get user statistics"""
from audio.models import Audio
channels_count = Channel.objects.filter(owner=obj).count()
playlists_count = Playlist.objects.filter(owner=obj).count()
audio_count = Audio.objects.filter(owner=obj).count()
return {
'total_channels': channels_count,
'total_playlists': playlists_count,
'total_audio_files': audio_count,
'storage_used_gb': obj.storage_used_gb,
'storage_quota_gb': obj.storage_quota_gb,
'storage_percent': obj.storage_percent_used,
}
class UserCreateSerializer(serializers.ModelSerializer):
"""Create new user (admin only)"""
password = serializers.CharField(write_only=True, required=True, style={'input_type': 'password'})
password_confirm = serializers.CharField(write_only=True, required=True, style={'input_type': 'password'})
class Meta:
model = Account
fields = [
'username',
'email',
'password',
'password_confirm',
'is_admin',
'is_active',
'storage_quota_gb',
'max_channels',
'max_playlists',
'user_notes',
]
def validate(self, data):
"""Validate password match"""
if data['password'] != data['password_confirm']:
raise serializers.ValidationError({"password": "Passwords do not match"})
return data
def create(self, validated_data):
"""Create user with hashed password"""
validated_data.pop('password_confirm')
password = validated_data.pop('password')
user = Account.objects.create_user(
username=validated_data['username'],
email=validated_data['email'],
password=password,
)
# Update additional fields
for key, value in validated_data.items():
setattr(user, key, value)
# Set created_by from request context
request = self.context.get('request')
if request and request.user.is_authenticated:
user.created_by = request.user
user.save()
return user
class UserUpdateSerializer(serializers.ModelSerializer):
"""Update user (admin only)"""
class Meta:
model = Account
fields = [
'is_admin',
'is_active',
'is_staff',
'storage_quota_gb',
'max_channels',
'max_playlists',
'user_notes',
]
class UserYouTubeAccountSerializer(serializers.ModelSerializer):
"""YouTube account serializer"""
class Meta:
model = UserYouTubeAccount
fields = [
'id',
'account_name',
'youtube_channel_id',
'youtube_channel_name',
'is_active',
'auto_download',
'download_quality',
'created_date',
'last_verified',
]
read_only_fields = ['id', 'created_date', 'last_verified']
class UserYouTubeAccountCreateSerializer(serializers.ModelSerializer):
"""Create YouTube account"""
class Meta:
model = UserYouTubeAccount
fields = [
'account_name',
'youtube_channel_id',
'youtube_channel_name',
'cookies_file',
'auto_download',
'download_quality',
]
def create(self, validated_data):
"""Set user from request context"""
request = self.context.get('request')
if request and request.user.is_authenticated:
validated_data['user'] = request.user
return super().create(validated_data)

158
backend/user/two_factor.py Normal file
View file

@ -0,0 +1,158 @@
"""2FA utility functions"""
import pyotp
import qrcode
import io
import base64
import secrets
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib import colors
from datetime import datetime
def generate_totp_secret():
"""Generate a new TOTP secret"""
return pyotp.random_base32()
def get_totp_uri(secret, username, issuer='SoundWave'):
"""Generate TOTP URI for QR code"""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=username, issuer_name=issuer)
def generate_qr_code(uri):
"""Generate QR code image as base64 string"""
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convert to base64
buffer = io.BytesIO()
img.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_base64}"
def verify_totp(secret, token):
"""Verify a TOTP token"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=1)
def generate_backup_codes(count=10):
"""Generate backup codes"""
codes = []
for _ in range(count):
code = '-'.join([
secrets.token_hex(2).upper(),
secrets.token_hex(2).upper(),
secrets.token_hex(2).upper()
])
codes.append(code)
return codes
def generate_backup_codes_pdf(username, codes):
"""Generate PDF with backup codes"""
buffer = io.BytesIO()
# Create PDF
doc = SimpleDocTemplate(buffer, pagesize=letter)
story = []
styles = getSampleStyleSheet()
# Custom styles
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
textColor=colors.HexColor('#1D3557'),
spaceAfter=30,
)
subtitle_style = ParagraphStyle(
'CustomSubtitle',
parent=styles['Normal'],
fontSize=12,
textColor=colors.HexColor('#718096'),
spaceAfter=20,
)
# Title
story.append(Paragraph('SoundWave Backup Codes', title_style))
story.append(Paragraph(f'User: {username}', subtitle_style))
story.append(Paragraph(f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', subtitle_style))
story.append(Spacer(1, 0.3 * inch))
# Warning
warning_style = ParagraphStyle(
'Warning',
parent=styles['Normal'],
fontSize=10,
textColor=colors.HexColor('#E53E3E'),
spaceAfter=20,
leftIndent=20,
rightIndent=20,
)
story.append(Paragraph(
'<b>⚠️ IMPORTANT:</b> Store these codes securely. Each code can only be used once. '
'If you lose access to your 2FA device, you can use these codes to log in.',
warning_style
))
story.append(Spacer(1, 0.3 * inch))
# Codes table
data = [['#', 'Backup Code']]
for i, code in enumerate(codes, 1):
data.append([str(i), code])
table = Table(data, colWidths=[0.5 * inch, 3 * inch])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#4ECDC4')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.HexColor('#1D3557')),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('TEXTCOLOR', (0, 1), (-1, -1), colors.HexColor('#2D3748')),
('FONTNAME', (0, 1), (-1, -1), 'Courier'),
('FONTSIZE', (0, 1), (-1, -1), 11),
('GRID', (0, 0), (-1, -1), 1, colors.HexColor('#E2E8F0')),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('LEFTPADDING', (0, 0), (-1, -1), 10),
('RIGHTPADDING', (0, 0), (-1, -1), 10),
('TOPPADDING', (0, 1), (-1, -1), 8),
('BOTTOMPADDING', (0, 1), (-1, -1), 8),
]))
story.append(table)
# Footer
story.append(Spacer(1, 0.5 * inch))
footer_style = ParagraphStyle(
'Footer',
parent=styles['Normal'],
fontSize=9,
textColor=colors.HexColor('#A0AEC0'),
alignment=1, # Center
)
story.append(Paragraph('Keep this document in a safe place', footer_style))
# Build PDF
doc.build(story)
buffer.seek(0)
return buffer

43
backend/user/urls.py Normal file
View file

@ -0,0 +1,43 @@
"""User URL patterns"""
from django.urls import path, include
from user.views import (
LoginView,
LogoutView,
RegisterView,
UserAccountView,
UserProfileView,
ChangePasswordView,
UserConfigView,
TwoFactorStatusView,
TwoFactorSetupView,
TwoFactorVerifyView,
TwoFactorDisableView,
TwoFactorRegenerateCodesView,
TwoFactorDownloadCodesView,
AvatarUploadView,
AvatarPresetView,
AvatarFileView,
)
urlpatterns = [
path('account/', UserAccountView.as_view(), name='user-account'),
path('profile/', UserProfileView.as_view(), name='user-profile'),
path('change-password/', ChangePasswordView.as_view(), name='change-password'),
path('login/', LoginView.as_view(), name='user-login'),
path('logout/', LogoutView.as_view(), name='user-logout'),
path('register/', RegisterView.as_view(), name='user-register'), # Returns 403 - disabled
path('config/', UserConfigView.as_view(), name='user-config'),
path('2fa/status/', TwoFactorStatusView.as_view(), name='2fa-status'),
path('2fa/setup/', TwoFactorSetupView.as_view(), name='2fa-setup'),
path('2fa/verify/', TwoFactorVerifyView.as_view(), name='2fa-verify'),
path('2fa/disable/', TwoFactorDisableView.as_view(), name='2fa-disable'),
path('2fa/regenerate-codes/', TwoFactorRegenerateCodesView.as_view(), name='2fa-regenerate'),
path('2fa/download-codes/', TwoFactorDownloadCodesView.as_view(), name='2fa-download'),
# Avatar management
path('avatar/upload/', AvatarUploadView.as_view(), name='avatar-upload'),
path('avatar/preset/', AvatarPresetView.as_view(), name='avatar-preset'),
path('avatar/file/<str:filename>/', AvatarFileView.as_view(), name='avatar-file'),
# Admin user management
path('', include('user.urls_admin')),
]

View file

@ -0,0 +1,12 @@
"""URL configuration for admin user management"""
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from user.views_admin import UserManagementViewSet, UserYouTubeAccountViewSet
router = DefaultRouter()
router.register(r'users', UserManagementViewSet, basename='admin-users')
router.register(r'youtube-accounts', UserYouTubeAccountViewSet, basename='youtube-accounts')
urlpatterns = [
path('admin/', include(router.urls)),
]

591
backend/user/views.py Normal file
View file

@ -0,0 +1,591 @@
"""User API views"""
import os
import mimetypes
from pathlib import Path
from django.contrib.auth import authenticate, login, logout
from django.http import HttpResponse, FileResponse
from rest_framework import status
from rest_framework.authtoken.models import Token
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.parsers import MultiPartParser, FormParser
from user.models import Account
from user.serializers import (
AccountSerializer,
LoginSerializer,
UserConfigSerializer,
TwoFactorSetupSerializer,
TwoFactorVerifySerializer,
TwoFactorStatusSerializer,
)
from user.two_factor import (
generate_totp_secret,
get_totp_uri,
generate_qr_code,
verify_totp,
generate_backup_codes,
generate_backup_codes_pdf,
)
from datetime import datetime
class UserAccountView(APIView):
"""User account endpoint"""
def get(self, request):
"""Get current user account"""
user = request.user
# Calculate current storage usage
user.calculate_storage_usage()
serializer = AccountSerializer(user)
return Response(serializer.data)
class UserProfileView(APIView):
"""User profile management"""
def patch(self, request):
"""Update user profile (username, email, first_name, last_name)"""
user = request.user
username = request.data.get('username')
email = request.data.get('email')
first_name = request.data.get('first_name')
last_name = request.data.get('last_name')
current_password = request.data.get('current_password', '').strip()
# At least one field must be provided
if not username and not email and first_name is None and last_name is None:
return Response(
{'error': 'At least one field (username, email, first_name, last_name) must be provided'},
status=status.HTTP_400_BAD_REQUEST
)
# Password is required to change username or email (security critical fields)
if (username or email) and not current_password:
return Response(
{'error': 'Current password is required to change username or email'},
status=status.HTTP_400_BAD_REQUEST
)
# Verify current password only if it's provided (for username/email changes)
if current_password and not user.check_password(current_password):
return Response(
{'error': 'Current password is incorrect'},
status=status.HTTP_401_UNAUTHORIZED
)
# Validate username
if username:
username = username.strip()
if len(username) < 3:
return Response(
{'error': 'Username must be at least 3 characters long'},
status=status.HTTP_400_BAD_REQUEST
)
if not username.isalnum() and '_' not in username:
return Response(
{'error': 'Username can only contain letters, numbers, and underscores'},
status=status.HTTP_400_BAD_REQUEST
)
# Check if username is already taken
if Account.objects.exclude(id=user.id).filter(username=username).exists():
return Response(
{'error': 'Username already taken'},
status=status.HTTP_400_BAD_REQUEST
)
# Check if email is already taken
if email and Account.objects.exclude(id=user.id).filter(email=email).exists():
return Response(
{'error': 'Email already in use'},
status=status.HTTP_400_BAD_REQUEST
)
# Update fields
updated_fields = []
if username:
user.username = username
updated_fields.append('username')
if email:
user.email = email
updated_fields.append('email')
if first_name is not None:
user.first_name = first_name
updated_fields.append('name')
if last_name is not None:
user.last_name = last_name
if 'name' not in updated_fields:
updated_fields.append('name')
user.save()
return Response({
'message': f'{" and ".join(updated_fields).capitalize()} updated successfully',
'user': AccountSerializer(user).data
})
class ChangePasswordView(APIView):
"""Change user password"""
def post(self, request):
"""Change password"""
user = request.user
current_password = request.data.get('current_password')
new_password = request.data.get('new_password')
if not current_password or not new_password:
return Response(
{'error': 'Current and new password are required'},
status=status.HTTP_400_BAD_REQUEST
)
# Verify current password
if not user.check_password(current_password):
return Response(
{'error': 'Current password is incorrect'},
status=status.HTTP_401_UNAUTHORIZED
)
# Validate new password length
if len(new_password) < 8:
return Response(
{'error': 'Password must be at least 8 characters long'},
status=status.HTTP_400_BAD_REQUEST
)
# Set new password
user.set_password(new_password)
user.save()
# Delete old token and create new one for security
Token.objects.filter(user=user).delete()
new_token = Token.objects.create(user=user)
return Response({
'message': 'Password changed successfully',
'token': new_token.key # Return new token so user stays logged in
})
class LoginView(APIView):
"""Login endpoint"""
permission_classes = [AllowAny]
def post(self, request):
"""Authenticate user"""
serializer = LoginSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = authenticate(
username=serializer.validated_data['username'],
password=serializer.validated_data['password']
)
if not user:
return Response(
{'error': 'Invalid credentials'},
status=status.HTTP_401_UNAUTHORIZED
)
# Check if 2FA is enabled
if user.two_factor_enabled:
two_factor_code = serializer.validated_data.get('two_factor_code')
if not two_factor_code:
return Response({
'requires_2fa': True,
'message': 'Two-factor authentication required'
}, status=status.HTTP_200_OK)
# Verify TOTP code
if user.two_factor_secret and verify_totp(user.two_factor_secret, two_factor_code):
pass # Code is valid, continue login
# Check backup codes
elif two_factor_code in user.backup_codes:
# Remove used backup code
user.backup_codes.remove(two_factor_code)
user.save()
else:
return Response(
{'error': 'Invalid two-factor code'},
status=status.HTTP_401_UNAUTHORIZED
)
login(request, user)
token, _ = Token.objects.get_or_create(user=user)
return Response({
'token': token.key,
'user': AccountSerializer(user).data
})
class LogoutView(APIView):
"""Logout endpoint"""
def post(self, request):
"""Logout user and delete token"""
# Delete the user's token for security
if request.user.is_authenticated:
try:
Token.objects.filter(user=request.user).delete()
except Token.DoesNotExist:
pass
logout(request)
return Response({'message': 'Logged out successfully'})
class RegisterView(APIView):
"""
Registration endpoint - DISABLED
Public registration is not allowed. Only admins can create new users.
"""
permission_classes = [AllowAny]
def post(self, request):
"""Block public registration"""
from config.user_settings import ALLOW_PUBLIC_REGISTRATION
if not ALLOW_PUBLIC_REGISTRATION:
return Response(
{
'error': 'Public registration is disabled',
'message': 'New users can only be created by administrators. Please contact your system administrator for account creation.'
},
status=status.HTTP_403_FORBIDDEN
)
# If registration is enabled in settings, this would handle it
# This code is kept for potential future use
return Response(
{'error': 'Registration endpoint not implemented'},
status=status.HTTP_501_NOT_IMPLEMENTED
)
class UserConfigView(APIView):
"""User configuration endpoint"""
def get(self, request):
"""Get user configuration"""
# TODO: Implement user config storage
config = {
'theme': 'dark',
'items_per_page': 50,
'audio_quality': 'best'
}
serializer = UserConfigSerializer(config)
return Response(serializer.data)
def post(self, request):
"""Update user configuration"""
serializer = UserConfigSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# TODO: Store user config
return Response(serializer.data)
class TwoFactorStatusView(APIView):
"""Get 2FA status"""
def get(self, request):
"""Get 2FA status for current user"""
user = request.user
serializer = TwoFactorStatusSerializer({
'enabled': user.two_factor_enabled,
'backup_codes_count': len(user.backup_codes) if user.backup_codes else 0
})
return Response(serializer.data)
class TwoFactorSetupView(APIView):
"""Setup 2FA"""
def post(self, request):
"""Generate 2FA secret and QR code"""
user = request.user
# Generate new secret
secret = generate_totp_secret()
uri = get_totp_uri(secret, user.username)
qr_code = generate_qr_code(uri)
# Generate backup codes
backup_codes = generate_backup_codes()
# Store secret temporarily (not enabled yet)
user.two_factor_secret = secret
user.backup_codes = backup_codes
user.save()
serializer = TwoFactorSetupSerializer({
'secret': secret,
'qr_code': qr_code,
'backup_codes': backup_codes
})
return Response(serializer.data)
class TwoFactorVerifyView(APIView):
"""Verify and enable 2FA"""
def post(self, request):
"""Verify 2FA code and enable"""
serializer = TwoFactorVerifySerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = request.user
code = serializer.validated_data['code']
if not user.two_factor_secret:
return Response(
{'error': 'No 2FA setup found. Please setup 2FA first.'},
status=status.HTTP_400_BAD_REQUEST
)
if verify_totp(user.two_factor_secret, code):
user.two_factor_enabled = True
user.save()
return Response({
'message': 'Two-factor authentication enabled successfully',
'enabled': True
})
return Response(
{'error': 'Invalid verification code'},
status=status.HTTP_400_BAD_REQUEST
)
class TwoFactorDisableView(APIView):
"""Disable 2FA"""
def post(self, request):
"""Disable 2FA for user"""
serializer = TwoFactorVerifySerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = request.user
code = serializer.validated_data['code']
if not user.two_factor_enabled:
return Response(
{'error': 'Two-factor authentication is not enabled'},
status=status.HTTP_400_BAD_REQUEST
)
# Verify code before disabling
if verify_totp(user.two_factor_secret, code) or code in user.backup_codes:
user.two_factor_enabled = False
user.two_factor_secret = None
user.backup_codes = []
user.save()
return Response({
'message': 'Two-factor authentication disabled successfully',
'enabled': False
})
return Response(
{'error': 'Invalid verification code'},
status=status.HTTP_400_BAD_REQUEST
)
class TwoFactorRegenerateCodesView(APIView):
"""Regenerate backup codes"""
def post(self, request):
"""Generate new backup codes"""
user = request.user
if not user.two_factor_enabled:
return Response(
{'error': 'Two-factor authentication is not enabled'},
status=status.HTTP_400_BAD_REQUEST
)
# Generate new backup codes
backup_codes = generate_backup_codes()
user.backup_codes = backup_codes
user.save()
return Response({
'backup_codes': backup_codes,
'message': 'Backup codes regenerated successfully'
})
class TwoFactorDownloadCodesView(APIView):
"""Download backup codes as PDF"""
def get(self, request):
"""Generate and download backup codes PDF"""
user = request.user
if not user.two_factor_enabled or not user.backup_codes:
return Response(
{'error': 'No backup codes available'},
status=status.HTTP_400_BAD_REQUEST
)
# Generate PDF
pdf_buffer = generate_backup_codes_pdf(user.username, user.backup_codes)
# Create filename: username_SoundWave_BackupCodes_YYYY-MM-DD.pdf
filename = f"{user.username}_SoundWave_BackupCodes_{datetime.now().strftime('%Y-%m-%d')}.pdf"
response = HttpResponse(pdf_buffer, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
class AvatarUploadView(APIView):
"""Upload user avatar"""
parser_classes = [MultiPartParser, FormParser]
# Avatar directory - persistent storage
AVATAR_DIR = Path('/app/data/avatars')
MAX_SIZE = 20 * 1024 * 1024 # 20MB
ALLOWED_TYPES = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
def post(self, request):
"""Upload custom avatar image"""
if 'avatar' not in request.FILES:
return Response(
{'error': 'No avatar file provided'},
status=status.HTTP_400_BAD_REQUEST
)
avatar_file = request.FILES['avatar']
# Validate file size
if avatar_file.size > self.MAX_SIZE:
return Response(
{'error': f'File too large. Maximum size is {self.MAX_SIZE // (1024*1024)}MB'},
status=status.HTTP_400_BAD_REQUEST
)
# Validate content type
content_type = avatar_file.content_type
if content_type not in self.ALLOWED_TYPES:
return Response(
{'error': f'Invalid file type. Allowed types: {", ".join(self.ALLOWED_TYPES)}'},
status=status.HTTP_400_BAD_REQUEST
)
# Create avatars directory if it doesn't exist
self.AVATAR_DIR.mkdir(parents=True, exist_ok=True)
# Generate safe filename: username_timestamp.ext
ext = Path(avatar_file.name).suffix or '.jpg'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{request.user.username}_{timestamp}{ext}"
filepath = self.AVATAR_DIR / filename
# Remove old avatar file if it exists and is not a preset
if request.user.avatar and not request.user.avatar.startswith('preset_'):
old_path = self.AVATAR_DIR / request.user.avatar.split('/')[-1]
if old_path.exists():
old_path.unlink()
# Save file
with open(filepath, 'wb+') as destination:
for chunk in avatar_file.chunks():
destination.write(chunk)
# Update user model
request.user.avatar = f"avatars/{filename}"
request.user.save()
return Response({
'message': 'Avatar uploaded successfully',
'avatar': request.user.avatar
})
def delete(self, request):
"""Remove custom avatar and reset to default"""
user = request.user
# Remove file if it exists and is not a preset
if user.avatar and not user.avatar.startswith('preset_'):
filepath = self.AVATAR_DIR / user.avatar.split('/')[-1]
if filepath.exists():
filepath.unlink()
user.avatar = None
user.save()
return Response({
'message': 'Avatar removed successfully'
})
class AvatarPresetView(APIView):
"""Set preset avatar"""
def post(self, request):
"""Set preset avatar (1-5)"""
preset = request.data.get('preset')
if not preset or not str(preset).isdigit():
return Response(
{'error': 'Invalid preset number'},
status=status.HTTP_400_BAD_REQUEST
)
preset_num = int(preset)
if preset_num < 1 or preset_num > 5:
return Response(
{'error': 'Preset must be between 1 and 5'},
status=status.HTTP_400_BAD_REQUEST
)
# Remove old custom avatar file if exists
user = request.user
if user.avatar and not user.avatar.startswith('preset_'):
avatar_dir = Path('/app/data/avatars')
filepath = avatar_dir / user.avatar.split('/')[-1]
if filepath.exists():
filepath.unlink()
# Set preset
user.avatar = f"preset_{preset_num}"
user.save()
return Response({
'message': 'Preset avatar set successfully',
'avatar': user.avatar
})
class AvatarFileView(APIView):
"""Serve avatar files"""
def get(self, request, filename):
"""Serve avatar file"""
avatar_dir = Path('/app/data/avatars')
filepath = avatar_dir / filename
# Security: validate path
if not filepath.resolve().is_relative_to(avatar_dir.resolve()):
return Response(
{'error': 'Invalid path'},
status=status.HTTP_403_FORBIDDEN
)
if not filepath.exists():
return Response(
{'error': 'Avatar not found'},
status=status.HTTP_404_NOT_FOUND
)
# Determine content type
content_type, _ = mimetypes.guess_type(str(filepath))
if not content_type:
content_type = 'application/octet-stream'
return FileResponse(open(filepath, 'rb'), content_type=content_type)

215
backend/user/views_admin.py Normal file
View file

@ -0,0 +1,215 @@
"""Admin views for user management"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from django.db.models import Count, Sum, Q
from django.contrib.auth import get_user_model
from user.models import UserYouTubeAccount
from user.serializers_admin import (
UserDetailSerializer,
UserCreateSerializer,
UserUpdateSerializer,
UserStatsSerializer,
UserYouTubeAccountSerializer,
UserYouTubeAccountCreateSerializer,
)
from channel.models import Channel
from playlist.models import Playlist
from audio.models import Audio
User = get_user_model()
class IsAdminOrSelf(IsAuthenticated):
"""Permission: Admin can access all, users can access only their own data"""
def has_object_permission(self, request, view, obj):
if request.user.is_admin or request.user.is_superuser:
return True
if hasattr(obj, 'owner'):
return obj.owner == request.user
if hasattr(obj, 'user'):
return obj.user == request.user
return obj == request.user
class UserManagementViewSet(viewsets.ModelViewSet):
"""Admin viewset for managing users"""
queryset = User.objects.all()
permission_classes = [IsAdminUser]
def get_serializer_class(self):
if self.action == 'create':
return UserCreateSerializer
elif self.action in ['update', 'partial_update']:
return UserUpdateSerializer
return UserDetailSerializer
def get_queryset(self):
"""Filter users based on permissions"""
queryset = User.objects.all()
# Admin sees all, regular users see only themselves
if not (self.request.user.is_admin or self.request.user.is_superuser):
queryset = queryset.filter(id=self.request.user.id)
# Add annotations
queryset = queryset.annotate(
channels_count=Count('channels', distinct=True),
playlists_count=Count('playlists', distinct=True),
audio_count=Count('audio_files', distinct=True),
)
return queryset.order_by('-date_joined')
@action(detail=True, methods=['get'])
def stats(self, request, pk=None):
"""Get detailed statistics for a user"""
user = self.get_object()
stats = {
'total_channels': Channel.objects.filter(owner=user).count(),
'active_channels': Channel.objects.filter(owner=user, subscribed=True).count(),
'total_playlists': Playlist.objects.filter(owner=user).count(),
'subscribed_playlists': Playlist.objects.filter(owner=user, subscribed=True).count(),
'total_audio_files': Audio.objects.filter(owner=user).count(),
'storage_used_gb': user.storage_used_gb,
'storage_quota_gb': user.storage_quota_gb,
'storage_percent': user.storage_percent_used,
'youtube_accounts': UserYouTubeAccount.objects.filter(user=user).count(),
}
serializer = UserStatsSerializer(stats)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def reset_storage(self, request, pk=None):
"""Reset user storage usage"""
user = self.get_object()
user.storage_used_gb = 0.0
user.save()
return Response({'message': 'Storage reset successfully'})
@action(detail=True, methods=['post'])
def reset_2fa(self, request, pk=None):
"""Reset user 2FA"""
user = self.get_object()
user.two_factor_enabled = False
user.two_factor_secret = ''
user.backup_codes = []
user.save()
return Response({'message': '2FA reset successfully'})
@action(detail=True, methods=['post'])
def toggle_active(self, request, pk=None):
"""Toggle user active status"""
user = self.get_object()
user.is_active = not user.is_active
user.save()
return Response({
'message': f'User {"activated" if user.is_active else "deactivated"}',
'is_active': user.is_active
})
@action(detail=True, methods=['get'])
def channels(self, request, pk=None):
"""Get user's channels"""
user = self.get_object()
channels = Channel.objects.filter(owner=user).values(
'id', 'channel_name', 'channel_id', 'subscribed', 'video_count'
)
return Response(channels)
@action(detail=True, methods=['get'])
def playlists(self, request, pk=None):
"""Get user's playlists"""
user = self.get_object()
playlists = Playlist.objects.filter(owner=user).values(
'id', 'title', 'playlist_id', 'subscribed', 'playlist_type'
)
return Response(playlists)
@action(detail=False, methods=['get'])
def system_stats(self, request):
"""Get system-wide statistics"""
total_users = User.objects.count()
active_users = User.objects.filter(is_active=True).count()
admin_users = User.objects.filter(Q(is_admin=True) | Q(is_superuser=True)).count()
total_channels = Channel.objects.count()
total_playlists = Playlist.objects.count()
total_audio = Audio.objects.count()
total_storage = User.objects.aggregate(
used=Sum('storage_used_gb'),
quota=Sum('storage_quota_gb')
)
return Response({
'users': {
'total': total_users,
'active': active_users,
'admin': admin_users,
},
'content': {
'channels': total_channels,
'playlists': total_playlists,
'audio_files': total_audio,
},
'storage': {
'used_gb': total_storage['used'] or 0,
'quota_gb': total_storage['quota'] or 0,
}
})
class UserYouTubeAccountViewSet(viewsets.ModelViewSet):
"""ViewSet for managing user YouTube accounts"""
permission_classes = [IsAdminOrSelf]
def get_serializer_class(self):
if self.action == 'create':
return UserYouTubeAccountCreateSerializer
return UserYouTubeAccountSerializer
def get_queryset(self):
"""Filter by user"""
queryset = UserYouTubeAccount.objects.all()
# Regular users see only their accounts
if not (self.request.user.is_admin or self.request.user.is_superuser):
queryset = queryset.filter(user=self.request.user)
# Filter by user_id if provided
user_id = self.request.query_params.get('user_id')
if user_id:
queryset = queryset.filter(user_id=user_id)
return queryset.order_by('-created_date')
def perform_create(self, serializer):
"""Set user from request"""
serializer.save(user=self.request.user)
@action(detail=True, methods=['post'])
def verify(self, request, pk=None):
"""Verify YouTube account credentials"""
account = self.get_object()
# TODO: Implement actual verification logic
from django.utils import timezone
account.last_verified = timezone.now()
account.save()
return Response({'message': 'Account verified successfully'})
@action(detail=True, methods=['post'])
def toggle_active(self, request, pk=None):
"""Toggle account active status"""
account = self.get_object()
account.is_active = not account.is_active
account.save()
return Response({
'message': f'Account {"activated" if account.is_active else "deactivated"}',
'is_active': account.is_active
})