Fix: Include backend/audio Django app in repository
This commit is contained in:
parent
d04e726373
commit
644cfab298
37 changed files with 6632 additions and 4 deletions
632
backend/audio/id3_service.py
Normal file
632
backend/audio/id3_service.py
Normal file
|
|
@ -0,0 +1,632 @@
|
|||
"""ID3 tag service for reading and writing audio metadata with broad codec support"""
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
from mutagen.mp4 import MP4, MP4Cover
|
||||
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB, TPE2, TDRC, TCON, TRCK, TPOS
|
||||
from mutagen.flac import FLAC, Picture
|
||||
from mutagen.oggvorbis import OggVorbis
|
||||
from mutagen.oggopus import OggOpus
|
||||
from mutagen.wavpack import WavPack
|
||||
from mutagen.musepack import Musepack
|
||||
from mutagen.monkeysaudio import MonkeysAudio
|
||||
from mutagen.aiff import AIFF
|
||||
from mutagen.wave import WAVE
|
||||
from mutagen.dsf import DSF
|
||||
from mutagen.dsdiff import DSDIFF
|
||||
from mutagen.mp3 import MP3
|
||||
import base64
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ID3TagService:
|
||||
"""Service for reading and writing ID3 tags with broad codec support including DSD"""
|
||||
|
||||
SUPPORTED_FORMATS = {
|
||||
# Lossy formats
|
||||
'.mp3': 'MP3',
|
||||
'.m4a': 'MP4',
|
||||
'.m4b': 'MP4',
|
||||
'.m4p': 'MP4',
|
||||
'.mp4': 'MP4',
|
||||
'.ogg': 'OGG',
|
||||
'.oga': 'OGG',
|
||||
'.opus': 'OPUS',
|
||||
'.mpc': 'MUSEPACK',
|
||||
|
||||
# Lossless formats
|
||||
'.flac': 'FLAC',
|
||||
'.wv': 'WAVPACK',
|
||||
'.ape': 'APE',
|
||||
'.aiff': 'AIFF',
|
||||
'.aif': 'AIFF',
|
||||
'.aifc': 'AIFF',
|
||||
'.wav': 'WAVE',
|
||||
|
||||
# High-resolution DSD formats
|
||||
'.dsf': 'DSF',
|
||||
'.dff': 'DSDIFF',
|
||||
}
|
||||
|
||||
def read_tags(self, file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Read tags from audio file
|
||||
|
||||
Supports: MP3, MP4/M4A, FLAC, OGG Vorbis, Opus, WavPack, APE, Musepack,
|
||||
DSF, DFF (DSDIFF), AIFF, WAV
|
||||
|
||||
Args:
|
||||
file_path: Path to audio file
|
||||
|
||||
Returns:
|
||||
Dictionary with tag information or None if error
|
||||
"""
|
||||
try:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
logger.error(f"File not found: {file_path}")
|
||||
return None
|
||||
|
||||
suffix = path.suffix.lower()
|
||||
format_type = self.SUPPORTED_FORMATS.get(suffix)
|
||||
|
||||
if not format_type:
|
||||
logger.warning(f"Unsupported audio format: {suffix}")
|
||||
return None
|
||||
|
||||
# Handle different audio formats
|
||||
if format_type == 'MP4':
|
||||
audio = MP4(file_path)
|
||||
tags = self._read_mp4_tags(audio)
|
||||
elif format_type == 'MP3':
|
||||
audio = MP3(file_path)
|
||||
tags = self._read_id3_tags(audio)
|
||||
elif format_type == 'FLAC':
|
||||
audio = FLAC(file_path)
|
||||
tags = self._read_vorbis_tags(audio)
|
||||
elif format_type in ['OGG', 'OPUS']:
|
||||
audio = OggVorbis(file_path) if format_type == 'OGG' else OggOpus(file_path)
|
||||
tags = self._read_vorbis_tags(audio)
|
||||
elif format_type == 'WAVPACK':
|
||||
audio = WavPack(file_path)
|
||||
tags = self._read_apev2_tags(audio)
|
||||
elif format_type == 'APE':
|
||||
audio = MonkeysAudio(file_path)
|
||||
tags = self._read_apev2_tags(audio)
|
||||
elif format_type == 'MUSEPACK':
|
||||
audio = Musepack(file_path)
|
||||
tags = self._read_apev2_tags(audio)
|
||||
elif format_type == 'DSF':
|
||||
audio = DSF(file_path)
|
||||
tags = self._read_dsf_tags(audio)
|
||||
elif format_type == 'DSDIFF':
|
||||
audio = DSDIFF(file_path)
|
||||
tags = self._read_dsdiff_tags(audio)
|
||||
elif format_type == 'AIFF':
|
||||
audio = AIFF(file_path)
|
||||
tags = self._read_id3_tags(audio)
|
||||
elif format_type == 'WAVE':
|
||||
audio = WAVE(file_path)
|
||||
tags = self._read_id3_tags(audio)
|
||||
else:
|
||||
logger.warning(f"Unsupported format type: {format_type}")
|
||||
return None
|
||||
|
||||
# Add audio properties
|
||||
if hasattr(audio, 'info'):
|
||||
tags['duration'] = getattr(audio.info, 'length', 0)
|
||||
tags['bitrate'] = getattr(audio.info, 'bitrate', 0)
|
||||
# DSD-specific properties
|
||||
if format_type in ['DSF', 'DSDIFF']:
|
||||
tags['sample_rate'] = getattr(audio.info, 'sample_rate', 0)
|
||||
tags['channels'] = getattr(audio.info, 'channels', 0)
|
||||
tags['bits_per_sample'] = getattr(audio.info, 'bits_per_sample', 1)
|
||||
|
||||
tags['format'] = format_type
|
||||
|
||||
return tags
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading tags from {file_path}: {e}")
|
||||
return None
|
||||
|
||||
def _read_mp4_tags(self, audio: MP4) -> Dict[str, Any]:
|
||||
"""Read tags from MP4/M4A file"""
|
||||
tags = {
|
||||
'title': audio.get('\xa9nam', [''])[0] if '\xa9nam' in audio else '',
|
||||
'artist': audio.get('\xa9ART', [''])[0] if '\xa9ART' in audio else '',
|
||||
'album': audio.get('\xa9alb', [''])[0] if '\xa9alb' in audio else '',
|
||||
'album_artist': audio.get('aART', [''])[0] if 'aART' in audio else '',
|
||||
'year': audio.get('\xa9day', [''])[0] if '\xa9day' in audio else '',
|
||||
'genre': audio.get('\xa9gen', [''])[0] if '\xa9gen' in audio else '',
|
||||
'has_cover': 'covr' in audio,
|
||||
}
|
||||
|
||||
# Track number
|
||||
if 'trkn' in audio:
|
||||
track_info = audio['trkn'][0]
|
||||
tags['track_number'] = track_info[0] if track_info[0] > 0 else None
|
||||
|
||||
# Disc number
|
||||
if 'disk' in audio:
|
||||
disc_info = audio['disk'][0]
|
||||
tags['disc_number'] = disc_info[0] if disc_info[0] > 0 else None
|
||||
|
||||
return tags
|
||||
|
||||
def _read_id3_tags(self, audio) -> Dict[str, Any]:
|
||||
"""Read tags from ID3 format (MP3, AIFF, WAV, DSF, DFF)"""
|
||||
tags = self._empty_tags()
|
||||
|
||||
if not hasattr(audio, 'tags') or audio.tags is None:
|
||||
return tags
|
||||
|
||||
id3 = audio.tags
|
||||
|
||||
tags['title'] = str(id3.get('TIT2', ''))
|
||||
tags['artist'] = str(id3.get('TPE1', ''))
|
||||
tags['album'] = str(id3.get('TALB', ''))
|
||||
tags['album_artist'] = str(id3.get('TPE2', ''))
|
||||
tags['genre'] = str(id3.get('TCON', ''))
|
||||
|
||||
# Year
|
||||
if 'TDRC' in id3:
|
||||
tags['year'] = str(id3['TDRC'])
|
||||
|
||||
# Track number
|
||||
if 'TRCK' in id3:
|
||||
track_str = str(id3['TRCK'])
|
||||
try:
|
||||
tags['track_number'] = int(track_str.split('/')[0]) if '/' in track_str else int(track_str)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Disc number
|
||||
if 'TPOS' in id3:
|
||||
disc_str = str(id3['TPOS'])
|
||||
try:
|
||||
tags['disc_number'] = int(disc_str.split('/')[0]) if '/' in disc_str else int(disc_str)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Check for cover art
|
||||
tags['has_cover'] = any(key.startswith('APIC') for key in id3.keys())
|
||||
|
||||
return tags
|
||||
|
||||
def _read_vorbis_tags(self, audio) -> Dict[str, Any]:
|
||||
"""Read tags from Vorbis comment format (FLAC, OGG, Opus)"""
|
||||
tags = {
|
||||
'title': audio.get('title', [''])[0],
|
||||
'artist': audio.get('artist', [''])[0],
|
||||
'album': audio.get('album', [''])[0],
|
||||
'album_artist': audio.get('albumartist', [''])[0] or audio.get('album artist', [''])[0],
|
||||
'year': audio.get('date', [''])[0] or audio.get('year', [''])[0],
|
||||
'genre': audio.get('genre', [''])[0],
|
||||
'has_cover': False,
|
||||
}
|
||||
|
||||
# Check for embedded pictures (FLAC)
|
||||
if hasattr(audio, 'pictures'):
|
||||
tags['has_cover'] = len(audio.pictures) > 0
|
||||
elif 'metadata_block_picture' in audio:
|
||||
tags['has_cover'] = True
|
||||
|
||||
# Track number
|
||||
track = audio.get('tracknumber', [''])[0]
|
||||
if track:
|
||||
try:
|
||||
tags['track_number'] = int(track.split('/')[0]) if '/' in track else int(track)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Disc number
|
||||
disc = audio.get('discnumber', [''])[0]
|
||||
if disc:
|
||||
try:
|
||||
tags['disc_number'] = int(disc.split('/')[0]) if '/' in disc else int(disc)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return tags
|
||||
|
||||
def _read_apev2_tags(self, audio) -> Dict[str, Any]:
|
||||
"""Read tags from APEv2 format (WavPack, APE, Musepack)"""
|
||||
tags = {
|
||||
'title': str(audio.get('Title', [''])[0]) if audio.get('Title') else '',
|
||||
'artist': str(audio.get('Artist', [''])[0]) if audio.get('Artist') else '',
|
||||
'album': str(audio.get('Album', [''])[0]) if audio.get('Album') else '',
|
||||
'album_artist': str(audio.get('Album Artist', [''])[0]) if audio.get('Album Artist') else '',
|
||||
'year': str(audio.get('Year', [''])[0]) if audio.get('Year') else '',
|
||||
'genre': str(audio.get('Genre', [''])[0]) if audio.get('Genre') else '',
|
||||
'has_cover': audio.get('Cover Art (Front)') is not None,
|
||||
}
|
||||
|
||||
# Track number
|
||||
track = audio.get('Track')
|
||||
if track:
|
||||
track_str = str(track[0]) if isinstance(track, list) else str(track)
|
||||
try:
|
||||
tags['track_number'] = int(track_str.split('/')[0]) if '/' in track_str else int(track_str)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Disc number
|
||||
disc = audio.get('Disc')
|
||||
if disc:
|
||||
disc_str = str(disc[0]) if isinstance(disc, list) else str(disc)
|
||||
try:
|
||||
tags['disc_number'] = int(disc_str.split('/')[0]) if '/' in disc_str else int(disc_str)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return tags
|
||||
|
||||
def _read_dsf_tags(self, audio: DSF) -> Dict[str, Any]:
|
||||
"""Read tags from DSF file (DSD Stream File)"""
|
||||
# DSF uses ID3v2 tags
|
||||
if hasattr(audio, 'tags') and audio.tags:
|
||||
return self._read_id3_tags(audio)
|
||||
return self._empty_tags()
|
||||
|
||||
def _read_dsdiff_tags(self, audio: DSDIFF) -> Dict[str, Any]:
|
||||
"""Read tags from DSDIFF/DFF file"""
|
||||
# DSDIFF uses ID3v2 tags
|
||||
if hasattr(audio, 'tags') and audio.tags:
|
||||
return self._read_id3_tags(audio)
|
||||
return self._empty_tags()
|
||||
|
||||
def _empty_tags(self) -> Dict[str, Any]:
|
||||
"""Return empty tags structure"""
|
||||
return {
|
||||
'title': '',
|
||||
'artist': '',
|
||||
'album': '',
|
||||
'album_artist': '',
|
||||
'year': '',
|
||||
'genre': '',
|
||||
'track_number': None,
|
||||
'disc_number': None,
|
||||
'has_cover': False,
|
||||
}
|
||||
|
||||
def write_tags(self, file_path: str, tags: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Write tags to audio file
|
||||
|
||||
Args:
|
||||
file_path: Path to audio file
|
||||
tags: Dictionary with tag values
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
logger.error(f"File not found: {file_path}")
|
||||
return False
|
||||
|
||||
suffix = path.suffix.lower()
|
||||
format_type = self.SUPPORTED_FORMATS.get(suffix)
|
||||
|
||||
if not format_type:
|
||||
logger.warning(f"Unsupported audio format for writing: {suffix}")
|
||||
return False
|
||||
|
||||
# Handle different audio formats
|
||||
if format_type == 'MP4':
|
||||
audio = MP4(file_path)
|
||||
self._write_mp4_tags(audio, tags)
|
||||
elif format_type == 'MP3':
|
||||
audio = MP3(file_path)
|
||||
if audio.tags is None:
|
||||
audio.add_tags()
|
||||
self._write_id3_tags(audio.tags, tags)
|
||||
elif format_type in ['FLAC', 'OGG', 'OPUS']:
|
||||
if format_type == 'FLAC':
|
||||
audio = FLAC(file_path)
|
||||
elif format_type == 'OGG':
|
||||
audio = OggVorbis(file_path)
|
||||
else:
|
||||
audio = OggOpus(file_path)
|
||||
self._write_vorbis_tags(audio, tags)
|
||||
elif format_type in ['WAVPACK', 'APE', 'MUSEPACK']:
|
||||
if format_type == 'WAVPACK':
|
||||
audio = WavPack(file_path)
|
||||
elif format_type == 'APE':
|
||||
audio = MonkeysAudio(file_path)
|
||||
else:
|
||||
audio = Musepack(file_path)
|
||||
self._write_apev2_tags(audio, tags)
|
||||
elif format_type in ['DSF', 'DSDIFF']:
|
||||
if format_type == 'DSF':
|
||||
audio = DSF(file_path)
|
||||
else:
|
||||
audio = DSDIFF(file_path)
|
||||
if audio.tags is None:
|
||||
audio.add_tags()
|
||||
self._write_id3_tags(audio.tags, tags)
|
||||
elif format_type in ['AIFF', 'WAVE']:
|
||||
if format_type == 'AIFF':
|
||||
audio = AIFF(file_path)
|
||||
else:
|
||||
audio = WAVE(file_path)
|
||||
if audio.tags is None:
|
||||
audio.add_tags()
|
||||
self._write_id3_tags(audio.tags, tags)
|
||||
else:
|
||||
logger.warning(f"Write not implemented for: {format_type}")
|
||||
return False
|
||||
|
||||
audio.save()
|
||||
logger.info(f"Successfully wrote tags to {file_path}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing tags to {file_path}: {e}")
|
||||
return False
|
||||
|
||||
def _write_mp4_tags(self, audio: MP4, tags: Dict[str, Any]) -> None:
|
||||
"""Write tags to MP4/M4A file"""
|
||||
if 'title' in tags:
|
||||
audio['\xa9nam'] = tags['title']
|
||||
if 'artist' in tags:
|
||||
audio['\xa9ART'] = tags['artist']
|
||||
if 'album' in tags:
|
||||
audio['\xa9alb'] = tags['album']
|
||||
if 'album_artist' in tags:
|
||||
audio['aART'] = tags['album_artist']
|
||||
if 'year' in tags:
|
||||
audio['\xa9day'] = tags['year']
|
||||
if 'genre' in tags:
|
||||
audio['\xa9gen'] = tags['genre']
|
||||
if 'track_number' in tags:
|
||||
current_disc = audio.get('trkn', [(0, 0)])[0]
|
||||
audio['trkn'] = [(tags['track_number'], current_disc[1] if current_disc else 0)]
|
||||
if 'disc_number' in tags:
|
||||
current_disc = audio.get('disk', [(0, 0)])[0]
|
||||
audio['disk'] = [(tags['disc_number'], current_disc[1] if current_disc else 0)]
|
||||
|
||||
def _write_id3_tags(self, id3: ID3, tags: Dict[str, Any]) -> None:
|
||||
"""Write tags to ID3 format"""
|
||||
if 'title' in tags:
|
||||
id3.add(TIT2(encoding=3, text=tags['title']))
|
||||
if 'artist' in tags:
|
||||
id3.add(TPE1(encoding=3, text=tags['artist']))
|
||||
if 'album' in tags:
|
||||
id3.add(TALB(encoding=3, text=tags['album']))
|
||||
if 'album_artist' in tags:
|
||||
id3.add(TPE2(encoding=3, text=tags['album_artist']))
|
||||
if 'year' in tags:
|
||||
id3.add(TDRC(encoding=3, text=tags['year']))
|
||||
if 'genre' in tags:
|
||||
id3.add(TCON(encoding=3, text=tags['genre']))
|
||||
if 'track_number' in tags:
|
||||
id3.add(TRCK(encoding=3, text=str(tags['track_number'])))
|
||||
if 'disc_number' in tags:
|
||||
id3.add(TPOS(encoding=3, text=str(tags['disc_number'])))
|
||||
|
||||
def _write_vorbis_tags(self, audio, tags: Dict[str, Any]) -> None:
|
||||
"""Write tags to Vorbis comment format (FLAC, OGG, Opus)"""
|
||||
if 'title' in tags:
|
||||
audio['title'] = tags['title']
|
||||
if 'artist' in tags:
|
||||
audio['artist'] = tags['artist']
|
||||
if 'album' in tags:
|
||||
audio['album'] = tags['album']
|
||||
if 'album_artist' in tags:
|
||||
audio['albumartist'] = tags['album_artist']
|
||||
if 'year' in tags:
|
||||
audio['date'] = tags['year']
|
||||
if 'genre' in tags:
|
||||
audio['genre'] = tags['genre']
|
||||
if 'track_number' in tags:
|
||||
audio['tracknumber'] = str(tags['track_number'])
|
||||
if 'disc_number' in tags:
|
||||
audio['discnumber'] = str(tags['disc_number'])
|
||||
|
||||
def _write_apev2_tags(self, audio, tags: Dict[str, Any]) -> None:
|
||||
"""Write tags to APEv2 format (WavPack, APE, Musepack)"""
|
||||
if 'title' in tags:
|
||||
audio['Title'] = tags['title']
|
||||
if 'artist' in tags:
|
||||
audio['Artist'] = tags['artist']
|
||||
if 'album' in tags:
|
||||
audio['Album'] = tags['album']
|
||||
if 'album_artist' in tags:
|
||||
audio['Album Artist'] = tags['album_artist']
|
||||
if 'year' in tags:
|
||||
audio['Year'] = tags['year']
|
||||
if 'genre' in tags:
|
||||
audio['Genre'] = tags['genre']
|
||||
if 'track_number' in tags:
|
||||
audio['Track'] = str(tags['track_number'])
|
||||
if 'disc_number' in tags:
|
||||
audio['Disc'] = str(tags['disc_number'])
|
||||
|
||||
def embed_cover_art(self, file_path: str, image_data: bytes, mime_type: str = 'image/jpeg') -> bool:
|
||||
"""
|
||||
Embed cover art in audio file
|
||||
|
||||
Args:
|
||||
file_path: Path to audio file
|
||||
image_data: Image data as bytes
|
||||
mime_type: MIME type of image (image/jpeg or image/png)
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
logger.error(f"File not found: {file_path}")
|
||||
return False
|
||||
|
||||
suffix = path.suffix.lower()
|
||||
format_type = self.SUPPORTED_FORMATS.get(suffix)
|
||||
|
||||
if not format_type:
|
||||
logger.warning(f"Unsupported format for cover art: {suffix}")
|
||||
return False
|
||||
|
||||
# Handle different audio formats
|
||||
if format_type == 'MP4':
|
||||
audio = MP4(file_path)
|
||||
if mime_type == 'image/png':
|
||||
cover = MP4Cover(image_data, imageformat=MP4Cover.FORMAT_PNG)
|
||||
else:
|
||||
cover = MP4Cover(image_data, imageformat=MP4Cover.FORMAT_JPEG)
|
||||
audio['covr'] = [cover]
|
||||
elif format_type in ['MP3', 'AIFF', 'WAVE', 'DSF', 'DSDIFF']:
|
||||
# All these formats support ID3v2
|
||||
if format_type == 'MP3':
|
||||
audio = MP3(file_path)
|
||||
elif format_type == 'DSF':
|
||||
audio = DSF(file_path)
|
||||
elif format_type == 'DSDIFF':
|
||||
audio = DSDIFF(file_path)
|
||||
elif format_type == 'AIFF':
|
||||
audio = AIFF(file_path)
|
||||
else:
|
||||
audio = WAVE(file_path)
|
||||
|
||||
if audio.tags is None:
|
||||
audio.add_tags()
|
||||
|
||||
# Remove existing APIC frames
|
||||
audio.tags.delall('APIC')
|
||||
|
||||
# Add new cover
|
||||
audio.tags.add(APIC(
|
||||
encoding=3,
|
||||
mime=mime_type,
|
||||
type=3, # Cover (front)
|
||||
desc='Cover',
|
||||
data=image_data
|
||||
))
|
||||
elif format_type in ['FLAC', 'OGG', 'OPUS']:
|
||||
if format_type == 'FLAC':
|
||||
audio = FLAC(file_path)
|
||||
elif format_type == 'OGG':
|
||||
audio = OggVorbis(file_path)
|
||||
else:
|
||||
audio = OggOpus(file_path)
|
||||
|
||||
# Create picture
|
||||
picture = Picture()
|
||||
picture.type = 3 # Cover (front)
|
||||
picture.mime = mime_type
|
||||
picture.desc = 'Cover'
|
||||
picture.data = image_data
|
||||
|
||||
if format_type == 'FLAC':
|
||||
# FLAC has native picture support
|
||||
audio.clear_pictures()
|
||||
audio.add_picture(picture)
|
||||
else:
|
||||
# OGG/Opus use base64 encoded metadata block
|
||||
if 'metadata_block_picture' in audio:
|
||||
del audio['metadata_block_picture']
|
||||
encoded = base64.b64encode(picture.write()).decode('ascii')
|
||||
audio['metadata_block_picture'] = [encoded]
|
||||
elif format_type in ['WAVPACK', 'APE', 'MUSEPACK']:
|
||||
if format_type == 'WAVPACK':
|
||||
audio = WavPack(file_path)
|
||||
elif format_type == 'APE':
|
||||
audio = MonkeysAudio(file_path)
|
||||
else:
|
||||
audio = Musepack(file_path)
|
||||
|
||||
# APEv2 stores cover as binary item
|
||||
audio['Cover Art (Front)'] = image_data
|
||||
else:
|
||||
logger.warning(f"Cover art embedding not implemented for: {format_type}")
|
||||
return False
|
||||
|
||||
audio.save()
|
||||
logger.info(f"Successfully embedded cover art in {file_path}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error embedding cover art in {file_path}: {e}")
|
||||
return False
|
||||
|
||||
def extract_cover_art(self, file_path: str) -> Optional[bytes]:
|
||||
"""
|
||||
Extract cover art from audio file
|
||||
|
||||
Args:
|
||||
file_path: Path to audio file
|
||||
|
||||
Returns:
|
||||
Cover art data as bytes or None if not found
|
||||
"""
|
||||
try:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
logger.error(f"File not found: {file_path}")
|
||||
return None
|
||||
|
||||
suffix = path.suffix.lower()
|
||||
format_type = self.SUPPORTED_FORMATS.get(suffix)
|
||||
|
||||
if not format_type:
|
||||
return None
|
||||
|
||||
# Handle different audio formats
|
||||
if format_type == 'MP4':
|
||||
audio = MP4(file_path)
|
||||
covers = audio.get('covr', [])
|
||||
if covers:
|
||||
return bytes(covers[0])
|
||||
elif format_type in ['MP3', 'AIFF', 'WAVE', 'DSF', 'DSDIFF']:
|
||||
if format_type == 'MP3':
|
||||
audio = MP3(file_path)
|
||||
elif format_type == 'DSF':
|
||||
audio = DSF(file_path)
|
||||
elif format_type == 'DSDIFF':
|
||||
audio = DSDIFF(file_path)
|
||||
elif format_type == 'AIFF':
|
||||
audio = AIFF(file_path)
|
||||
else:
|
||||
audio = WAVE(file_path)
|
||||
|
||||
if audio.tags:
|
||||
for key in audio.tags.keys():
|
||||
if key.startswith('APIC'):
|
||||
return audio.tags[key].data
|
||||
elif format_type == 'FLAC':
|
||||
audio = FLAC(file_path)
|
||||
if audio.pictures:
|
||||
return audio.pictures[0].data
|
||||
elif format_type in ['OGG', 'OPUS']:
|
||||
if format_type == 'OGG':
|
||||
audio = OggVorbis(file_path)
|
||||
else:
|
||||
audio = OggOpus(file_path)
|
||||
|
||||
# Check for base64 encoded picture
|
||||
if 'metadata_block_picture' in audio:
|
||||
encoded = audio['metadata_block_picture'][0]
|
||||
picture_data = base64.b64decode(encoded)
|
||||
picture = Picture(picture_data)
|
||||
return picture.data
|
||||
elif format_type in ['WAVPACK', 'APE', 'MUSEPACK']:
|
||||
if format_type == 'WAVPACK':
|
||||
audio = WavPack(file_path)
|
||||
elif format_type == 'APE':
|
||||
audio = MonkeysAudio(file_path)
|
||||
else:
|
||||
audio = Musepack(file_path)
|
||||
|
||||
cover = audio.get('Cover Art (Front)')
|
||||
if cover:
|
||||
return bytes(cover[0]) if isinstance(cover, list) else bytes(cover)
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error extracting cover art from {file_path}: {e}")
|
||||
return None
|
||||
Loading…
Add table
Add a link
Reference in a new issue