""" Bank Statement Import Module for FINA Finance Tracker Parses PDF and CSV bank statements and extracts transactions """ import re import csv import io from datetime import datetime from decimal import Decimal import PyPDF2 class BankStatementParser: """Base parser class for bank statements""" def __init__(self): self.transactions = [] self.detected_format = None self.total_transactions = 0 self.parse_errors = [] def parse(self, file_content, file_type): """ Main parse method - detects format and extracts transactions Args: file_content: File content (bytes for PDF, string for CSV) file_type: 'pdf' or 'csv' Returns: dict with transactions and metadata """ if file_type == 'pdf': return self.parse_pdf(file_content) elif file_type == 'csv': return self.parse_csv(file_content) else: return {'success': False, 'error': 'Unsupported file type'} def parse_pdf(self, pdf_bytes): """ Parse PDF bank statement Extracts transactions using pattern matching """ try: pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) text = "" # Extract text from all pages for page in pdf_reader.pages: text += page.extract_text() + "\n" # Detect bank format bank_format = self.detect_bank_format(text) # Parse transactions based on detected format if bank_format == 'generic': transactions = self.parse_generic_pdf(text) else: transactions = self.parse_generic_pdf(text) # Fallback to generic return { 'success': True, 'transactions': transactions, 'total_found': len(transactions), 'bank_format': bank_format, 'parse_errors': self.parse_errors } except Exception as e: return { 'success': False, 'error': f'PDF parsing failed: {str(e)}', 'transactions': [] } def parse_csv(self, csv_string): """ Parse CSV bank statement Auto-detects column mapping """ try: # Try different delimiters delimiter = self.detect_csv_delimiter(csv_string) stream = io.StringIO(csv_string) csv_reader = csv.DictReader(stream, delimiter=delimiter) # Auto-detect column names fieldnames = csv_reader.fieldnames column_map = self.detect_csv_columns(fieldnames) transactions = [] row_num = 0 for row in csv_reader: row_num += 1 try: transaction = self.extract_transaction_from_csv_row(row, column_map) if transaction: transactions.append(transaction) except Exception as e: self.parse_errors.append(f"Row {row_num}: {str(e)}") return { 'success': True, 'transactions': transactions, 'total_found': len(transactions), 'column_mapping': column_map, 'parse_errors': self.parse_errors } except Exception as e: return { 'success': False, 'error': f'CSV parsing failed: {str(e)}', 'transactions': [] } def detect_bank_format(self, text): """Detect which bank format the PDF uses""" text_lower = text.lower() # Add patterns for specific banks if 'revolut' in text_lower: return 'revolut' elif 'ing' in text_lower or 'ing bank' in text_lower: return 'ing' elif 'bcr' in text_lower or 'banca comercială' in text_lower: return 'bcr' elif 'brd' in text_lower: return 'brd' else: return 'generic' def parse_generic_pdf(self, text): """ Parse PDF using generic patterns Looks for common transaction patterns across banks """ transactions = [] lines = text.split('\n') # Common patterns for transactions # Date patterns: DD/MM/YYYY, DD-MM-YYYY, YYYY-MM-DD date_patterns = [ r'(\d{2}[/-]\d{2}[/-]\d{4})', # DD/MM/YYYY or DD-MM-YYYY r'(\d{4}[/-]\d{2}[/-]\d{2})', # YYYY-MM-DD ] # Amount patterns: -123.45, 123.45, 123,45, -123,45 amount_patterns = [ r'[-]?\d{1,10}[.,]\d{2}', # With 2 decimals r'[-]?\d{1,10}\s*(?:RON|EUR|USD|GBP|LEI)', # With currency ] for i, line in enumerate(lines): # Skip header lines if any(word in line.lower() for word in ['sold', 'balance', 'iban', 'account', 'statement']): continue # Look for date in line date_match = None for pattern in date_patterns: match = re.search(pattern, line) if match: date_match = match.group(1) break if not date_match: continue # Parse date trans_date = self.parse_date(date_match) if not trans_date: continue # Look for amount in this line and nearby lines amount = None description = line # Check current line and next 2 lines for amount for j in range(i, min(i + 3, len(lines))): amounts_found = re.findall(r'[-]?\d{1,10}[.,]\d{2}', lines[j]) if amounts_found: # Take the last amount (usually the transaction amount) amount_str = amounts_found[-1] amount = self.parse_amount(amount_str) break if not amount or amount == 0: continue # Clean description description = self.clean_description(line, date_match, str(amount)) if description: transactions.append({ 'date': trans_date, 'description': description, 'amount': abs(amount), # Always positive, type determined by sign 'type': 'expense' if amount < 0 else 'income', 'original_amount': amount }) # Deduplicate based on date + amount + description similarity transactions = self.deduplicate_transactions(transactions) return transactions def detect_csv_delimiter(self, csv_string): """Detect CSV delimiter (comma, semicolon, tab)""" first_line = csv_string.split('\n')[0] comma_count = first_line.count(',') semicolon_count = first_line.count(';') tab_count = first_line.count('\t') if semicolon_count > comma_count and semicolon_count > tab_count: return ';' elif tab_count > comma_count: return '\t' else: return ',' def detect_csv_columns(self, fieldnames): """ Auto-detect which columns contain date, description, amount Returns mapping of column indices """ fieldnames_lower = [f.lower() if f else '' for f in fieldnames] column_map = { 'date': None, 'description': None, 'amount': None, 'debit': None, 'credit': None } # Date column keywords date_keywords = ['date', 'data', 'fecha', 'datum', 'transaction date'] for idx, name in enumerate(fieldnames_lower): if any(keyword in name for keyword in date_keywords): column_map['date'] = fieldnames[idx] break # Description column keywords desc_keywords = ['description', 'descriere', 'descripción', 'details', 'detalii', 'merchant', 'comerciant'] for idx, name in enumerate(fieldnames_lower): if any(keyword in name for keyword in desc_keywords): column_map['description'] = fieldnames[idx] break # Amount columns amount_keywords = ['amount', 'suma', 'monto', 'valoare'] debit_keywords = ['debit', 'withdrawal', 'retragere', 'retiro', 'spent'] credit_keywords = ['credit', 'deposit', 'depunere', 'ingreso', 'income'] for idx, name in enumerate(fieldnames_lower): if any(keyword in name for keyword in amount_keywords): column_map['amount'] = fieldnames[idx] elif any(keyword in name for keyword in debit_keywords): column_map['debit'] = fieldnames[idx] elif any(keyword in name for keyword in credit_keywords): column_map['credit'] = fieldnames[idx] return column_map def extract_transaction_from_csv_row(self, row, column_map): """Extract transaction data from CSV row using column mapping""" # Get date date_col = column_map.get('date') if not date_col or date_col not in row: return None trans_date = self.parse_date(row[date_col]) if not trans_date: return None # Get description desc_col = column_map.get('description') description = row.get(desc_col, 'Transaction') if desc_col else 'Transaction' # Get amount amount = 0 trans_type = 'expense' # Check if we have separate debit/credit columns if column_map.get('debit') and column_map.get('credit'): debit_val = self.parse_amount(row.get(column_map['debit'], '0')) credit_val = self.parse_amount(row.get(column_map['credit'], '0')) if debit_val > 0: amount = debit_val trans_type = 'expense' elif credit_val > 0: amount = credit_val trans_type = 'income' elif column_map.get('amount'): amount_val = self.parse_amount(row.get(column_map['amount'], '0')) amount = abs(amount_val) trans_type = 'expense' if amount_val < 0 else 'income' if amount == 0: return None return { 'date': trans_date, 'description': description.strip(), 'amount': amount, 'type': trans_type } def parse_date(self, date_str): """Parse date string in various formats""" date_str = date_str.strip() # Try different date formats formats = [ '%d/%m/%Y', '%d-%m-%Y', '%Y-%m-%d', '%Y/%m/%d', '%d.%m.%Y', '%m/%d/%Y', '%d %b %Y', '%d %B %Y' ] for fmt in formats: try: return datetime.strptime(date_str, fmt).date() except ValueError: continue return None def parse_amount(self, amount_str): """Parse amount string to float""" if not amount_str: return 0.0 # Remove currency symbols and whitespace amount_str = str(amount_str).strip() amount_str = re.sub(r'[^\d.,-]', '', amount_str) if not amount_str: return 0.0 # Handle comma as decimal separator (European format) if ',' in amount_str and '.' in amount_str: # Format: 1.234,56 -> remove dots, replace comma with dot amount_str = amount_str.replace('.', '').replace(',', '.') elif ',' in amount_str: # Format: 1234,56 -> replace comma with dot amount_str = amount_str.replace(',', '.') try: return float(amount_str) except ValueError: return 0.0 def clean_description(self, text, date_str, amount_str): """Clean transaction description by removing date and amount""" # Remove date text = text.replace(date_str, '') # Remove amount text = text.replace(amount_str, '') # Remove extra whitespace text = ' '.join(text.split()) # Remove common keywords remove_words = ['transaction', 'payment', 'transfer', 'tranzactie', 'plata'] for word in remove_words: text = re.sub(word, '', text, flags=re.IGNORECASE) text = text.strip() # If too short, return generic if len(text) < 3: return 'Bank Transaction' return text[:200] # Limit length def deduplicate_transactions(self, transactions): """Remove duplicate transactions""" seen = set() unique = [] for trans in transactions: # Create signature: date + amount + first 20 chars of description signature = ( trans['date'].isoformat(), round(trans['amount'], 2), trans['description'][:20].lower() ) if signature not in seen: seen.add(signature) unique.append(trans) return unique def validate_file(self, file_content, file_type, max_size_mb=10): """ Validate uploaded file Args: file_content: File content bytes file_type: 'pdf' or 'csv' max_size_mb: Maximum file size in MB Returns: (is_valid, error_message) """ # Check file size size_mb = len(file_content) / (1024 * 1024) if size_mb > max_size_mb: return False, f'File too large. Maximum size is {max_size_mb}MB' # Check file type if file_type == 'pdf': # Check PDF header if not file_content.startswith(b'%PDF'): return False, 'Invalid PDF file' elif file_type == 'csv': # Try to decode as text try: file_content.decode('utf-8') except UnicodeDecodeError: try: file_content.decode('latin-1') except: return False, 'Invalid CSV file encoding' else: return False, 'Unsupported file type. Use PDF or CSV' return True, None def parse_bank_statement(file_content, filename): """ Main entry point for bank statement parsing Args: file_content: File content as bytes filename: Original filename Returns: Parse results dictionary """ parser = BankStatementParser() # Determine file type file_ext = filename.lower().split('.')[-1] if file_ext == 'pdf': file_type = 'pdf' content = file_content elif file_ext == 'csv': file_type = 'csv' # Try to decode try: content = file_content.decode('utf-8') except UnicodeDecodeError: content = file_content.decode('latin-1', errors='ignore') else: return { 'success': False, 'error': 'Unsupported file type. Please upload PDF or CSV files.' } # Validate file is_valid, error_msg = parser.validate_file(file_content, file_type) if not is_valid: return {'success': False, 'error': error_msg} # Parse file result = parser.parse(content, file_type) return result