streamflow/backend/jobs/logManagement.js
2025-12-17 00:42:43 +00:00

438 lines
14 KiB
JavaScript

/**
* Log Management & Retention System (CWE-53 Compliance)
* Automated cleanup, archival, integrity verification, and monitoring
*/
const logger = require('../utils/logger');
const logAggregator = require('../utils/logAggregator');
const SecurityAuditLogger = require('../utils/securityAudit');
const fs = require('fs').promises;
const path = require('path');
const zlib = require('zlib');
const { promisify } = require('util');
const gzip = promisify(zlib.gzip);
class LogManagement {
constructor() {
this.archiveDir = path.join(__dirname, '../../data/log-archives');
this.initialized = false;
}
/**
* Initialize log management system
*/
async initialize() {
if (this.initialized) return;
try {
// Ensure archive directory exists
await fs.mkdir(this.archiveDir, { recursive: true, mode: 0o700 });
// Schedule daily log cleanup (runs at 2 AM)
this.scheduleDailyCleanup();
// Schedule hourly integrity verification
this.scheduleIntegrityChecks();
// Schedule weekly archival
this.scheduleWeeklyArchival();
logger.info('[LogManagement] Initialized - Automated cleanup, archival, and integrity checks active');
this.initialized = true;
} catch (error) {
logger.error('[LogManagement] Failed to initialize:', error);
}
}
/**
* Schedule daily log cleanup at 2 AM
*/
scheduleDailyCleanup() {
const scheduleNextCleanup = () => {
const now = new Date();
const next2AM = new Date();
next2AM.setHours(2, 0, 0, 0);
// If it's past 2 AM today, schedule for tomorrow
if (now > next2AM) {
next2AM.setDate(next2AM.getDate() + 1);
}
const msUntil2AM = next2AM - now;
setTimeout(async () => {
await this.performDailyCleanup();
scheduleNextCleanup(); // Schedule next day
}, msUntil2AM);
logger.info(`[LogManagement] Daily cleanup scheduled for ${next2AM.toISOString()}`);
};
scheduleNextCleanup();
}
/**
* Schedule hourly integrity verification
*/
scheduleIntegrityChecks() {
// Run immediately on startup
this.verifyLogIntegrity();
// Then run every hour
setInterval(() => {
this.verifyLogIntegrity();
}, 60 * 60 * 1000); // 1 hour
logger.info('[LogManagement] Hourly integrity checks scheduled');
}
/**
* Schedule weekly archival (every Sunday at 3 AM)
*/
scheduleWeeklyArchival() {
const scheduleNextArchival = () => {
const now = new Date();
const nextSunday3AM = new Date();
nextSunday3AM.setHours(3, 0, 0, 0);
// Calculate days until next Sunday (0 = Sunday)
const daysUntilSunday = (7 - now.getDay()) % 7 || 7;
nextSunday3AM.setDate(nextSunday3AM.getDate() + daysUntilSunday);
// If we're past 3 AM on Sunday, wait until next Sunday
if (now.getDay() === 0 && now > nextSunday3AM) {
nextSunday3AM.setDate(nextSunday3AM.getDate() + 7);
}
const msUntilArchival = nextSunday3AM - now;
setTimeout(async () => {
await this.performWeeklyArchival();
scheduleNextArchival(); // Schedule next week
}, msUntilArchival);
logger.info(`[LogManagement] Weekly archival scheduled for ${nextSunday3AM.toISOString()}`);
};
scheduleNextArchival();
}
/**
* Perform daily log cleanup with archival
*/
async performDailyCleanup() {
try {
logger.info('[LogManagement] Starting daily log cleanup...');
// Get retention settings from environment or defaults
const auditRetention = parseInt(process.env.AUDIT_LOG_RETENTION) || 90;
const aggregatedRetention = parseInt(process.env.AGGREGATED_LOG_RETENTION) || 90;
// Archive logs before deletion
await this.archiveOldLogs(auditRetention);
// Cleanup audit logs
const auditDeleted = await SecurityAuditLogger.cleanupOldLogs(auditRetention);
logger.info(`[LogManagement] Cleaned up ${auditDeleted} old security audit logs (>${auditRetention} days)`);
// Cleanup aggregated logs
const aggregatedDeleted = await logAggregator.cleanup(aggregatedRetention);
logger.info(`[LogManagement] Cleaned up ${aggregatedDeleted} old aggregated logs (>${aggregatedRetention} days)`);
// Cleanup old file logs (keep last 30 days of rotated files)
await this.cleanupFileLogRotations();
// Log the cleanup event
await SecurityAuditLogger.logSystemEvent('log_cleanup', true, {
auditDeleted,
aggregatedDeleted,
retentionDays: { audit: auditRetention, aggregated: aggregatedRetention }
});
logger.info('[LogManagement] Daily log cleanup completed successfully');
} catch (error) {
logger.error('[LogManagement] Error during daily cleanup:', error);
await SecurityAuditLogger.logSystemEvent('log_cleanup', false, {
error: error.message
});
}
}
/**
* Archive old logs to compressed files before deletion
*/
async archiveOldLogs(retentionDays) {
try {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
// Query logs that will be deleted
const logsToArchive = await logAggregator.query({
endDate: cutoffDate.toISOString(),
limit: 10000
});
if (logsToArchive.length === 0) {
logger.info('[LogManagement] No logs to archive');
return;
}
// Create archive filename
const archiveFilename = `logs-archive-${cutoffDate.toISOString().split('T')[0]}-${Date.now()}.json.gz`;
const archivePath = path.join(this.archiveDir, archiveFilename);
// Compress and save
const logsJson = JSON.stringify(logsToArchive, null, 2);
const compressed = await gzip(logsJson);
await fs.writeFile(archivePath, compressed, { mode: 0o600 });
// Set restrictive permissions
await fs.chmod(archivePath, 0o600);
logger.info(`[LogManagement] Archived ${logsToArchive.length} logs to ${archiveFilename} (${(compressed.length / 1024).toFixed(2)} KB)`);
return archiveFilename;
} catch (error) {
logger.error('[LogManagement] Error archiving logs:', error);
throw error;
}
}
/**
* Perform weekly full archival
*/
async performWeeklyArchival() {
try {
logger.info('[LogManagement] Starting weekly full log archival...');
// Archive all logs from last week
const lastWeek = new Date();
lastWeek.setDate(lastWeek.getDate() - 7);
const allLogs = await logAggregator.query({
startDate: new Date(lastWeek.getTime() - 7 * 24 * 60 * 60 * 1000).toISOString(),
endDate: lastWeek.toISOString(),
limit: 50000
});
if (allLogs.length > 0) {
const archiveFilename = `logs-weekly-${lastWeek.toISOString().split('T')[0]}.json.gz`;
const archivePath = path.join(this.archiveDir, archiveFilename);
const logsJson = JSON.stringify(allLogs, null, 2);
const compressed = await gzip(logsJson);
await fs.writeFile(archivePath, compressed, { mode: 0o600 });
logger.info(`[LogManagement] Weekly archive complete: ${archiveFilename} (${allLogs.length} logs, ${(compressed.length / 1024 / 1024).toFixed(2)} MB)`);
}
// Cleanup old archives (keep 1 year)
await this.cleanupOldArchives(365);
await SecurityAuditLogger.logSystemEvent('log_weekly_archive', true, {
logsArchived: allLogs.length
});
} catch (error) {
logger.error('[LogManagement] Error during weekly archival:', error);
await SecurityAuditLogger.logSystemEvent('log_weekly_archive', false, {
error: error.message
});
}
}
/**
* Verify log integrity and alert on tampering
*/
async verifyLogIntegrity() {
try {
logger.debug('[LogManagement] Starting log integrity verification...');
const result = await logAggregator.verifyIntegrity();
if (result.tampered > 0) {
// CRITICAL: Log tampering detected!
logger.error(`[LogManagement] ⚠️ LOG TAMPERING DETECTED! ${result.tampered} tampered logs found`);
// Log to security audit
await SecurityAuditLogger.logSecurityIncident('log_tampering', {
tamperedCount: result.tampered,
verifiedCount: result.verified,
totalCount: result.total,
tamperedLogs: result.tamperedLogs.slice(0, 10) // First 10 for details
});
// In production, this should trigger alerts (email, Slack, PagerDuty, etc.)
logger.error('[LogManagement] Security team should be notified immediately');
} else {
logger.debug(`[LogManagement] Integrity check passed: ${result.verified} logs verified`);
}
return result;
} catch (error) {
logger.error('[LogManagement] Error during integrity verification:', error);
return null;
}
}
/**
* Cleanup rotated file logs older than X days
*/
async cleanupFileLogRotations() {
try {
const logsDir = path.join(__dirname, '../../logs');
const files = await fs.readdir(logsDir);
const now = Date.now();
const maxAge = 30 * 24 * 60 * 60 * 1000; // 30 days
let deletedCount = 0;
for (const file of files) {
// Only process rotated files (*.log.1, *.log.2, etc.)
if (file.match(/\.log\.\d+$/)) {
const filePath = path.join(logsDir, file);
const stats = await fs.stat(filePath);
const fileAge = now - stats.mtime.getTime();
if (fileAge > maxAge) {
await fs.unlink(filePath);
deletedCount++;
logger.debug(`[LogManagement] Deleted old rotated log: ${file}`);
}
}
}
if (deletedCount > 0) {
logger.info(`[LogManagement] Cleaned up ${deletedCount} old rotated log files`);
}
} catch (error) {
logger.error('[LogManagement] Error cleaning up rotated logs:', error);
}
}
/**
* Cleanup old archives (keep for specified days)
*/
async cleanupOldArchives(retentionDays) {
try {
const files = await fs.readdir(this.archiveDir);
const now = Date.now();
const maxAge = retentionDays * 24 * 60 * 60 * 1000;
let deletedCount = 0;
for (const file of files) {
if (file.endsWith('.json.gz')) {
const filePath = path.join(this.archiveDir, file);
const stats = await fs.stat(filePath);
const fileAge = now - stats.mtime.getTime();
if (fileAge > maxAge) {
await fs.unlink(filePath);
deletedCount++;
logger.debug(`[LogManagement] Deleted old archive: ${file}`);
}
}
}
if (deletedCount > 0) {
logger.info(`[LogManagement] Cleaned up ${deletedCount} old log archives`);
}
} catch (error) {
logger.error('[LogManagement] Error cleaning up old archives:', error);
}
}
/**
* Get log management statistics
*/
async getStatistics() {
try {
const stats = await logAggregator.getStatistics(30);
// Get archive info
const archives = await fs.readdir(this.archiveDir);
const archiveFiles = archives.filter(f => f.endsWith('.json.gz'));
let totalArchiveSize = 0;
for (const file of archiveFiles) {
const filePath = path.join(this.archiveDir, file);
const stat = await fs.stat(filePath);
totalArchiveSize += stat.size;
}
return {
...stats,
archives: {
count: archiveFiles.length,
totalSize: totalArchiveSize,
totalSizeMB: (totalArchiveSize / 1024 / 1024).toFixed(2)
}
};
} catch (error) {
logger.error('[LogManagement] Error getting statistics:', error);
return null;
}
}
/**
* List available log archives
*/
async listArchives() {
try {
const files = await fs.readdir(this.archiveDir);
const archives = [];
for (const file of files) {
if (file.endsWith('.json.gz')) {
const filePath = path.join(this.archiveDir, file);
const stats = await fs.stat(filePath);
archives.push({
filename: file,
size: stats.size,
sizeMB: (stats.size / 1024 / 1024).toFixed(2),
created: stats.mtime.toISOString()
});
}
}
// Sort by creation date descending
archives.sort((a, b) => new Date(b.created) - new Date(a.created));
return archives;
} catch (error) {
logger.error('[LogManagement] Error listing archives:', error);
return [];
}
}
/**
* Manual trigger for log cleanup (admin function)
*/
async manualCleanup(retentionDays) {
logger.info(`[LogManagement] Manual cleanup triggered (retention: ${retentionDays} days)`);
const auditDeleted = await SecurityAuditLogger.cleanupOldLogs(retentionDays);
const aggregatedDeleted = await logAggregator.cleanup(retentionDays);
await SecurityAuditLogger.logAdminActivity(null, 'manual_log_cleanup', {
auditDeleted,
aggregatedDeleted,
retentionDays
});
return { auditDeleted, aggregatedDeleted };
}
/**
* Manual trigger for integrity verification (admin function)
*/
async manualIntegrityCheck() {
logger.info('[LogManagement] Manual integrity check triggered');
return await this.verifyLogIntegrity();
}
}
// Create singleton instance
const logManagement = new LogManagement();
module.exports = logManagement;