soundwave/backend/task/tasks.py

508 lines
19 KiB
Python
Raw Permalink Normal View History

"""Celery tasks for background processing"""
from celery import shared_task
import yt_dlp
from audio.models import Audio
from channel.models import Channel
from download.models import DownloadQueue
from datetime import datetime
from django.utils import timezone
import os
@shared_task
def download_audio_task(queue_id):
"""Download audio from YouTube - AUDIO ONLY, no video"""
try:
queue_item = DownloadQueue.objects.get(id=queue_id)
queue_item.status = 'downloading'
queue_item.started_date = timezone.now()
queue_item.save()
# yt-dlp options for AUDIO ONLY (no video)
ydl_opts = {
'format': 'bestaudio/best', # Best audio quality, no video
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'm4a',
'preferredquality': '192',
}],
'outtmpl': '/app/audio/%(channel)s/%(title)s-%(id)s.%(ext)s',
'quiet': True,
'no_warnings': True,
'extract_audio': True, # Ensure audio extraction
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(queue_item.url, download=True)
# Get the actual downloaded filename from yt-dlp
# After post-processing with FFmpegExtractAudio, the extension will be .m4a
# We need to use prepare_filename and replace the extension
actual_filename = ydl.prepare_filename(info)
# Replace extension with .m4a since we're extracting audio
import os as os_module
base_filename = os_module.path.splitext(actual_filename)[0]
actual_filename = base_filename + '.m4a'
# Remove /app/audio/ prefix to get relative path
if actual_filename.startswith('/app/audio/'):
file_path = actual_filename[11:] # Remove '/app/audio/' prefix
else:
# Fallback to constructed path if prepare_filename doesn't work as expected
file_path = f"{info.get('channel', 'unknown')}/{info.get('title', 'unknown')}-{info['id']}.m4a"
# Create Audio object
audio, created = Audio.objects.get_or_create(
owner=queue_item.owner,
youtube_id=info['id'],
defaults={
'title': info.get('title', 'Unknown'),
'description': info.get('description', ''),
'channel_id': info.get('channel_id', ''),
'channel_name': info.get('channel', 'Unknown'),
'duration': info.get('duration', 0),
'file_path': file_path,
'file_size': info.get('filesize', 0) or 0,
'thumbnail_url': info.get('thumbnail', ''),
'published_date': datetime.strptime(info.get('upload_date', '20230101'), '%Y%m%d'),
'view_count': info.get('view_count', 0) or 0,
'like_count': info.get('like_count', 0) or 0,
}
)
# Queue a task to link this audio to playlists (optimized - runs after download)
# This prevents blocking the download task with expensive playlist lookups
link_audio_to_playlists.delay(audio.id, queue_item.owner.id)
queue_item.status = 'completed'
queue_item.completed_date = timezone.now()
queue_item.youtube_id = info['id']
queue_item.title = info.get('title', '')
queue_item.save()
return f"Downloaded: {info.get('title', 'Unknown')}"
except Exception as e:
queue_item.status = 'failed'
queue_item.error_message = str(e)
queue_item.save()
raise
@shared_task
def download_channel_task(channel_id):
"""Smart sync: Download only NEW audio from channel (not already downloaded)"""
try:
channel = Channel.objects.get(id=channel_id)
channel.sync_status = 'syncing'
channel.error_message = ''
channel.save()
url = f"https://www.youtube.com/channel/{channel.channel_id}/videos"
# Extract flat to get list quickly
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True,
'playlistend': 50, # Limit to last 50 videos per sync
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info or 'entries' not in info:
channel.sync_status = 'failed'
channel.error_message = 'Failed to fetch channel videos'
channel.save()
return f"Failed to fetch channel videos"
# Get list of already downloaded video IDs
existing_ids = set(Audio.objects.filter(
owner=channel.owner
).values_list('youtube_id', flat=True))
# Queue only NEW videos
new_videos = 0
skipped = 0
for entry in info['entries']:
if not entry:
continue
video_id = entry.get('id')
if not video_id:
continue
# SMART SYNC: Skip if already downloaded
if video_id in existing_ids:
skipped += 1
continue
# This is NEW content
queue_item, created = DownloadQueue.objects.get_or_create(
owner=channel.owner,
url=f"https://www.youtube.com/watch?v={video_id}",
defaults={
'youtube_id': video_id,
'title': entry.get('title', 'Unknown'),
'status': 'pending',
'auto_start': True
}
)
if created:
new_videos += 1
download_audio_task.delay(queue_item.id)
# Update channel status
channel.sync_status = 'success'
channel.downloaded_count = len(existing_ids)
channel.save()
if new_videos == 0:
return f"Channel '{channel.channel_name}' up to date ({skipped} already downloaded)"
return f"Channel '{channel.channel_name}': {new_videos} new audio(s) queued, {skipped} already downloaded"
except Exception as e:
channel.sync_status = 'failed'
channel.error_message = str(e)
channel.save()
raise
@shared_task(bind=True, name="subscribe_to_playlist")
def subscribe_to_playlist(self, user_id, playlist_url):
"""
TubeArchivist pattern: Subscribe to playlist and trigger audio download
Called from API Creates subscription Downloads audio (not video)
"""
from django.contrib.auth import get_user_model
from playlist.models import Playlist
from common.src.youtube_metadata import get_playlist_metadata
import re
User = get_user_model()
user = User.objects.get(id=user_id)
# Extract playlist ID from URL
patterns = [
r'[?&]list=([a-zA-Z0-9_-]+)',
r'playlist\?list=([a-zA-Z0-9_-]+)',
]
playlist_id = None
for pattern in patterns:
match = re.search(pattern, playlist_url)
if match:
playlist_id = match.group(1)
break
if not playlist_id and len(playlist_url) >= 13 and playlist_url.startswith(('PL', 'UU', 'LL', 'RD')):
playlist_id = playlist_url
if not playlist_id:
raise ValueError("Invalid playlist URL")
# Check if already subscribed
if Playlist.objects.filter(owner=user, playlist_id=playlist_id).exists():
return f"Already subscribed to playlist {playlist_id}"
# Fetch metadata
metadata = get_playlist_metadata(playlist_id)
if not metadata:
raise ValueError("Failed to fetch playlist metadata")
# Create subscription
playlist = Playlist.objects.create(
owner=user,
playlist_id=playlist_id,
title=metadata['title'],
description=metadata['description'],
channel_name=metadata['channel_name'],
channel_id=metadata['channel_id'],
thumbnail_url=metadata['thumbnail_url'],
item_count=metadata['item_count'],
playlist_type='youtube',
subscribed=True,
auto_download=True,
sync_status='pending',
)
# Trigger audio download task
download_playlist_task.delay(playlist.id)
return f"Subscribed to playlist: {metadata['title']}"
@shared_task(bind=True, name="subscribe_to_channel")
def subscribe_to_channel(self, user_id, channel_url):
"""
TubeArchivist pattern: Subscribe to channel and trigger audio download
Called from API Creates subscription Downloads audio (not video)
"""
from django.contrib.auth import get_user_model
from channel.models import Channel
from common.src.youtube_metadata import get_channel_metadata
import re
User = get_user_model()
user = User.objects.get(id=user_id)
# Extract channel ID from URL
patterns = [
r'youtube\.com/channel/(UC[\w-]+)',
r'youtube\.com/@([\w-]+)',
r'youtube\.com/c/([\w-]+)',
r'youtube\.com/user/([\w-]+)',
]
channel_id = None
for pattern in patterns:
match = re.search(pattern, channel_url)
if match:
channel_id = match.group(1)
break
if not channel_id and channel_url.startswith('UC') and len(channel_url) == 24:
channel_id = channel_url
if not channel_id:
channel_id = channel_url # Try as-is
# Fetch metadata (this resolves handles to actual channel IDs)
metadata = get_channel_metadata(channel_id)
if not metadata:
raise ValueError("Failed to fetch channel metadata")
actual_channel_id = metadata['channel_id']
# Check if already subscribed
if Channel.objects.filter(owner=user, channel_id=actual_channel_id).exists():
return f"Already subscribed to channel {actual_channel_id}"
# Create subscription
channel = Channel.objects.create(
owner=user,
channel_id=actual_channel_id,
channel_name=metadata['channel_name'],
channel_description=metadata['channel_description'],
channel_thumbnail=metadata['channel_thumbnail'],
subscriber_count=metadata['subscriber_count'],
video_count=metadata['video_count'],
subscribed=True,
auto_download=True,
sync_status='pending',
)
# Trigger audio download task
download_channel_task.delay(channel.id)
return f"Subscribed to channel: {metadata['channel_name']}"
@shared_task(name="update_subscriptions")
def update_subscriptions_task():
"""
TubeArchivist pattern: Periodic task to check ALL subscriptions for NEW audio
Runs every 2 hours via Celery Beat
"""
from playlist.models import Playlist
# Sync all subscribed playlists
playlists = Playlist.objects.filter(subscribed=True, auto_download=True)
for playlist in playlists:
download_playlist_task.delay(playlist.id)
# Sync all subscribed channels
channels = Channel.objects.filter(subscribed=True, auto_download=True)
for channel in channels:
download_channel_task.delay(channel.id)
return f"Syncing {playlists.count()} playlists and {channels.count()} channels"
@shared_task
def download_playlist_task(playlist_id):
"""Smart sync: Download only NEW audio from playlist (not already downloaded)"""
from playlist.models import Playlist, PlaylistItem
try:
playlist = Playlist.objects.get(id=playlist_id)
playlist.sync_status = 'syncing'
playlist.error_message = ''
playlist.save()
url = f"https://www.youtube.com/playlist?list={playlist.playlist_id}"
# Extract flat to get list quickly without downloading
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info or 'entries' not in info:
playlist.sync_status = 'failed'
playlist.error_message = 'Failed to fetch playlist items'
playlist.save()
return f"Failed to fetch playlist items"
# Update item count
total_items = len([e for e in info['entries'] if e])
playlist.item_count = total_items
# Get list of already downloaded video IDs
existing_ids = set(Audio.objects.filter(
owner=playlist.owner
).values_list('youtube_id', flat=True))
# Queue only NEW videos (not already downloaded)
new_videos = 0
skipped = 0
for idx, entry in enumerate(info['entries']):
if not entry:
continue
video_id = entry.get('id')
if not video_id:
continue
# Check if audio already exists
audio_obj = Audio.objects.filter(
owner=playlist.owner,
youtube_id=video_id
).first()
# Create PlaylistItem if audio exists but not in playlist yet
if audio_obj:
PlaylistItem.objects.get_or_create(
playlist=playlist,
audio=audio_obj,
defaults={'position': idx}
)
skipped += 1
continue
# This is NEW content - add to download queue
queue_item, created = DownloadQueue.objects.get_or_create(
owner=playlist.owner,
url=f"https://www.youtube.com/watch?v={video_id}",
defaults={
'youtube_id': video_id,
'title': entry.get('title', 'Unknown'),
'status': 'pending',
'auto_start': True
}
)
if created:
new_videos += 1
# Trigger download task for NEW video
download_audio_task.delay(queue_item.id)
# Create PlaylistItem for the downloaded audio (will be created after download completes)
# Note: Audio object might not exist yet, so we'll add a post-download hook
# Update playlist status
playlist.sync_status = 'success'
playlist.last_refresh = timezone.now()
# Count only audios from THIS playlist (match by checking all video IDs in playlist)
all_playlist_video_ids = [e.get('id') for e in info['entries'] if e and e.get('id')]
playlist.downloaded_count = Audio.objects.filter(
owner=playlist.owner,
youtube_id__in=all_playlist_video_ids
).count()
playlist.save()
if new_videos == 0:
return f"Playlist '{playlist.title}' up to date ({skipped} already downloaded)"
return f"Playlist '{playlist.title}': {new_videos} new audio(s) queued, {skipped} already downloaded"
except Exception as e:
playlist.sync_status = 'failed'
playlist.error_message = str(e)
playlist.save()
raise
@shared_task
def link_audio_to_playlists(audio_id, user_id):
"""Link newly downloaded audio to playlists that contain it (optimized)"""
from playlist.models import Playlist, PlaylistItem
from django.contrib.auth import get_user_model
try:
User = get_user_model()
user = User.objects.get(id=user_id)
audio = Audio.objects.get(id=audio_id)
# Get all playlists for this user
playlists = Playlist.objects.filter(owner=user, playlist_type='youtube')
# For each playlist, check if this video is in it
for playlist in playlists:
# Check if already linked
if PlaylistItem.objects.filter(playlist=playlist, audio=audio).exists():
continue
try:
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
playlist_info = ydl.extract_info(
f"https://www.youtube.com/playlist?list={playlist.playlist_id}",
download=False
)
if playlist_info and 'entries' in playlist_info:
for idx, entry in enumerate(playlist_info['entries']):
if entry and entry.get('id') == audio.youtube_id:
# Found it! Create the link
PlaylistItem.objects.get_or_create(
playlist=playlist,
audio=audio,
defaults={'position': idx}
)
# Update playlist downloaded count
all_video_ids = [e.get('id') for e in playlist_info['entries'] if e and e.get('id')]
playlist.downloaded_count = Audio.objects.filter(
owner=user,
youtube_id__in=all_video_ids
).count()
playlist.save(update_fields=['downloaded_count'])
break
except Exception as e:
# Don't fail if playlist linking fails
pass
return f"Linked audio {audio.youtube_id} to playlists"
except Exception as e:
# Don't fail - this is a best-effort operation
return f"Failed to link audio: {str(e)}"
@shared_task
def cleanup_task():
"""Cleanup old download queue items"""
# Remove completed items older than 7 days
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=7)
deleted = DownloadQueue.objects.filter(
status='completed',
completed_date__lt=cutoff_date
).delete()
return f"Cleaned up {deleted[0]} items"