/** * Centralized Log Aggregation System (SIEM) * Consolidates logs from multiple sources into a protected repository * Provides holistic visibility across the infrastructure */ const logger = require('./logger'); const { db } = require('../database/db'); const crypto = require('crypto'); const fs = require('fs').promises; const path = require('path'); class LogAggregator { constructor() { this.logSources = new Map(); this.aggregationBuffer = []; this.bufferSize = 100; // Batch size for bulk insert this.flushInterval = 5000; // 5 seconds this.initializeAggregation(); } /** * Initialize aggregation system */ async initializeAggregation() { // Create aggregated_logs table if not exists await this.createAggregatedLogsTable(); // Start periodic flush setInterval(() => this.flushBuffer(), this.flushInterval); logger.info('[LogAggregator] Initialized - SIEM mode active'); } /** * Create database table for aggregated logs */ async createAggregatedLogsTable() { return new Promise((resolve, reject) => { db.run(` CREATE TABLE IF NOT EXISTS aggregated_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, log_id TEXT UNIQUE NOT NULL, source TEXT NOT NULL, level TEXT NOT NULL, category TEXT NOT NULL, message TEXT NOT NULL, metadata TEXT, user_id INTEGER, ip_address TEXT, user_agent TEXT, signature TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `, (err) => { if (err) { logger.error('[LogAggregator] Failed to create aggregated_logs table:', err); reject(err); } else { // Create indexes for fast querying db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_source ON aggregated_logs(source, timestamp DESC)`); db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_level ON aggregated_logs(level, timestamp DESC)`); db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_category ON aggregated_logs(category, timestamp DESC)`); db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_user ON aggregated_logs(user_id, timestamp DESC)`); db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_timestamp ON aggregated_logs(timestamp DESC)`); resolve(); } }); }); } /** * Register a log source * @param {String} sourceName - Name of the log source * @param {Object} config - Source configuration */ registerSource(sourceName, config = {}) { this.logSources.set(sourceName, { name: sourceName, enabled: config.enabled !== false, priority: config.priority || 'medium', retention: config.retention || 90, // days ...config }); logger.info(`[LogAggregator] Registered source: ${sourceName}`); } /** * Aggregate log entry with cryptographic signature * @param {String} source - Log source identifier * @param {String} level - Log level (info, warn, error, critical) * @param {String} category - Log category (auth, access, security, system, application) * @param {String} message - Log message * @param {Object} details - Additional details */ async aggregate(source, level, category, message, details = {}) { const logId = this.generateLogId(); const timestamp = new Date().toISOString(); const logEntry = { log_id: logId, source, level, category, message, metadata: JSON.stringify({ ...details, aggregatedAt: timestamp }), user_id: details.userId || null, ip_address: details.ip || null, user_agent: details.userAgent || null, timestamp }; // Generate cryptographic signature for log integrity logEntry.signature = this.generateSignature(logEntry); // Add to buffer this.aggregationBuffer.push(logEntry); // Flush if buffer is full if (this.aggregationBuffer.length >= this.bufferSize) { await this.flushBuffer(); } return logId; } /** * Generate unique log ID */ generateLogId() { const timestamp = Date.now(); const random = crypto.randomBytes(8).toString('hex'); return `LOG-${timestamp}-${random}`; } /** * Generate cryptographic signature for log entry * SHA-256 HMAC with secret key for integrity verification */ generateSignature(logEntry) { const secret = process.env.LOG_SIGNATURE_SECRET || 'default-secret-change-in-production'; const data = `${logEntry.log_id}|${logEntry.source}|${logEntry.level}|${logEntry.category}|${logEntry.message}|${logEntry.timestamp}`; return crypto .createHmac('sha256', secret) .update(data) .digest('hex'); } /** * Verify log entry signature */ verifySignature(logEntry) { const expectedSignature = this.generateSignature(logEntry); return logEntry.signature === expectedSignature; } /** * Flush aggregation buffer to database */ async flushBuffer() { if (this.aggregationBuffer.length === 0) return; const batch = [...this.aggregationBuffer]; this.aggregationBuffer = []; try { await this.bulkInsert(batch); logger.debug(`[LogAggregator] Flushed ${batch.length} log entries`); } catch (error) { logger.error('[LogAggregator] Failed to flush buffer:', error); // Re-add failed entries to buffer this.aggregationBuffer.unshift(...batch); } } /** * Bulk insert log entries */ async bulkInsert(entries) { if (entries.length === 0) return; const placeholders = entries.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)').join(','); const values = entries.flatMap(entry => [ entry.log_id, entry.source, entry.level, entry.category, entry.message, entry.metadata, entry.user_id, entry.ip_address, entry.user_agent, entry.signature, entry.timestamp ]); return new Promise((resolve, reject) => { db.run( `INSERT INTO aggregated_logs (log_id, source, level, category, message, metadata, user_id, ip_address, user_agent, signature, timestamp) VALUES ${placeholders}`, values, (err) => { if (err) reject(err); else resolve(); } ); }); } /** * Query aggregated logs */ async query(filters = {}) { const { source, level, category, userId, startDate, endDate, limit = 1000, offset = 0, orderBy = 'timestamp', order = 'DESC' } = filters; let whereClause = []; let params = []; if (source) { whereClause.push('source = ?'); params.push(source); } if (level) { if (Array.isArray(level)) { whereClause.push(`level IN (${level.map(() => '?').join(',')})`); params.push(...level); } else { whereClause.push('level = ?'); params.push(level); } } if (category) { whereClause.push('category = ?'); params.push(category); } if (userId) { whereClause.push('user_id = ?'); params.push(userId); } if (startDate) { whereClause.push('timestamp >= ?'); params.push(startDate); } if (endDate) { whereClause.push('timestamp <= ?'); params.push(endDate); } const where = whereClause.length > 0 ? `WHERE ${whereClause.join(' AND ')}` : ''; params.push(limit, offset); return new Promise((resolve, reject) => { db.all( `SELECT * FROM aggregated_logs ${where} ORDER BY ${orderBy} ${order} LIMIT ? OFFSET ?`, params, (err, rows) => { if (err) reject(err); else resolve(rows); } ); }); } /** * Get log statistics */ async getStatistics(timeRange = 24) { const startTime = new Date(Date.now() - timeRange * 60 * 60 * 1000).toISOString(); return new Promise((resolve, reject) => { db.all( `SELECT source, level, category, COUNT(*) as count, MIN(timestamp) as first_seen, MAX(timestamp) as last_seen FROM aggregated_logs WHERE timestamp >= ? GROUP BY source, level, category ORDER BY count DESC`, [startTime], (err, rows) => { if (err) reject(err); else { const stats = { timeRange: `${timeRange} hours`, totalLogs: rows.reduce((sum, row) => sum + row.count, 0), bySource: {}, byLevel: {}, byCategory: {}, breakdown: rows }; rows.forEach(row => { // By source if (!stats.bySource[row.source]) stats.bySource[row.source] = 0; stats.bySource[row.source] += row.count; // By level if (!stats.byLevel[row.level]) stats.byLevel[row.level] = 0; stats.byLevel[row.level] += row.count; // By category if (!stats.byCategory[row.category]) stats.byCategory[row.category] = 0; stats.byCategory[row.category] += row.count; }); resolve(stats); } } ); }); } /** * Verify log integrity * Checks if log entries have been tampered with */ async verifyIntegrity(logIds = null) { const query = logIds ? `SELECT * FROM aggregated_logs WHERE log_id IN (${logIds.map(() => '?').join(',')})` : `SELECT * FROM aggregated_logs ORDER BY timestamp DESC LIMIT 1000`; const params = logIds || []; return new Promise((resolve, reject) => { db.all(query, params, (err, rows) => { if (err) { reject(err); return; } const results = { total: rows.length, verified: 0, tampered: 0, tamperedLogs: [] }; rows.forEach(row => { if (this.verifySignature(row)) { results.verified++; } else { results.tampered++; results.tamperedLogs.push({ log_id: row.log_id, timestamp: row.timestamp, source: row.source }); } }); resolve(results); }); }); } /** * Cleanup old logs based on retention policy */ async cleanup(retentionDays = 90) { const cutoffDate = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString(); return new Promise((resolve, reject) => { db.run( 'DELETE FROM aggregated_logs WHERE timestamp < ?', [cutoffDate], function(err) { if (err) reject(err); else { logger.info(`[LogAggregator] Cleaned up ${this.changes} old log entries (retention: ${retentionDays} days)`); resolve(this.changes); } } ); }); } /** * Export logs to external SIEM system */ async export(filters = {}, format = 'json') { const logs = await this.query({ ...filters, limit: 10000 }); if (format === 'json') { return JSON.stringify(logs, null, 2); } else if (format === 'csv') { const headers = ['log_id', 'source', 'level', 'category', 'message', 'timestamp', 'ip_address', 'user_id']; const csv = [headers.join(',')]; logs.forEach(log => { const row = headers.map(header => { const value = log[header] || ''; return `"${String(value).replace(/"/g, '""')}"`; }); csv.push(row.join(',')); }); return csv.join('\n'); } throw new Error(`Unsupported export format: ${format}`); } } // Create singleton instance const logAggregator = new LogAggregator(); // Register default sources logAggregator.registerSource('authentication', { priority: 'critical', retention: 365 }); logAggregator.registerSource('authorization', { priority: 'high', retention: 365 }); logAggregator.registerSource('security_audit', { priority: 'critical', retention: 365 }); logAggregator.registerSource('application', { priority: 'medium', retention: 90 }); logAggregator.registerSource('system', { priority: 'high', retention: 180 }); logAggregator.registerSource('access', { priority: 'low', retention: 30 }); module.exports = logAggregator;