streamflow/backend/utils/logAggregator.js

445 lines
12 KiB
JavaScript
Raw Permalink Normal View History

/**
* Centralized Log Aggregation System (SIEM)
* Consolidates logs from multiple sources into a protected repository
* Provides holistic visibility across the infrastructure
*/
const logger = require('./logger');
const { db } = require('../database/db');
const crypto = require('crypto');
const fs = require('fs').promises;
const path = require('path');
class LogAggregator {
constructor() {
this.logSources = new Map();
this.aggregationBuffer = [];
this.bufferSize = 100; // Batch size for bulk insert
this.flushInterval = 5000; // 5 seconds
this.initializeAggregation();
}
/**
* Initialize aggregation system
*/
async initializeAggregation() {
// Create aggregated_logs table if not exists
await this.createAggregatedLogsTable();
// Start periodic flush
setInterval(() => this.flushBuffer(), this.flushInterval);
logger.info('[LogAggregator] Initialized - SIEM mode active');
}
/**
* Create database table for aggregated logs
*/
async createAggregatedLogsTable() {
return new Promise((resolve, reject) => {
db.run(`
CREATE TABLE IF NOT EXISTS aggregated_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
log_id TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
level TEXT NOT NULL,
category TEXT NOT NULL,
message TEXT NOT NULL,
metadata TEXT,
user_id INTEGER,
ip_address TEXT,
user_agent TEXT,
signature TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`, (err) => {
if (err) {
logger.error('[LogAggregator] Failed to create aggregated_logs table:', err);
reject(err);
} else {
// Create indexes for fast querying
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_source ON aggregated_logs(source, timestamp DESC)`);
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_level ON aggregated_logs(level, timestamp DESC)`);
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_category ON aggregated_logs(category, timestamp DESC)`);
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_user ON aggregated_logs(user_id, timestamp DESC)`);
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_timestamp ON aggregated_logs(timestamp DESC)`);
resolve();
}
});
});
}
/**
* Register a log source
* @param {String} sourceName - Name of the log source
* @param {Object} config - Source configuration
*/
registerSource(sourceName, config = {}) {
this.logSources.set(sourceName, {
name: sourceName,
enabled: config.enabled !== false,
priority: config.priority || 'medium',
retention: config.retention || 90, // days
...config
});
logger.info(`[LogAggregator] Registered source: ${sourceName}`);
}
/**
* Aggregate log entry with cryptographic signature
* @param {String} source - Log source identifier
* @param {String} level - Log level (info, warn, error, critical)
* @param {String} category - Log category (auth, access, security, system, application)
* @param {String} message - Log message
* @param {Object} details - Additional details
*/
async aggregate(source, level, category, message, details = {}) {
const logId = this.generateLogId();
const timestamp = new Date().toISOString();
const logEntry = {
log_id: logId,
source,
level,
category,
message,
metadata: JSON.stringify({
...details,
aggregatedAt: timestamp
}),
user_id: details.userId || null,
ip_address: details.ip || null,
user_agent: details.userAgent || null,
timestamp
};
// Generate cryptographic signature for log integrity
logEntry.signature = this.generateSignature(logEntry);
// Add to buffer
this.aggregationBuffer.push(logEntry);
// Flush if buffer is full
if (this.aggregationBuffer.length >= this.bufferSize) {
await this.flushBuffer();
}
return logId;
}
/**
* Generate unique log ID
*/
generateLogId() {
const timestamp = Date.now();
const random = crypto.randomBytes(8).toString('hex');
return `LOG-${timestamp}-${random}`;
}
/**
* Generate cryptographic signature for log entry
* SHA-256 HMAC with secret key for integrity verification
*/
generateSignature(logEntry) {
const secret = process.env.LOG_SIGNATURE_SECRET || 'default-secret-change-in-production';
const data = `${logEntry.log_id}|${logEntry.source}|${logEntry.level}|${logEntry.category}|${logEntry.message}|${logEntry.timestamp}`;
return crypto
.createHmac('sha256', secret)
.update(data)
.digest('hex');
}
/**
* Verify log entry signature
*/
verifySignature(logEntry) {
const expectedSignature = this.generateSignature(logEntry);
return logEntry.signature === expectedSignature;
}
/**
* Flush aggregation buffer to database
*/
async flushBuffer() {
if (this.aggregationBuffer.length === 0) return;
const batch = [...this.aggregationBuffer];
this.aggregationBuffer = [];
try {
await this.bulkInsert(batch);
logger.debug(`[LogAggregator] Flushed ${batch.length} log entries`);
} catch (error) {
logger.error('[LogAggregator] Failed to flush buffer:', error);
// Re-add failed entries to buffer
this.aggregationBuffer.unshift(...batch);
}
}
/**
* Bulk insert log entries
*/
async bulkInsert(entries) {
if (entries.length === 0) return;
const placeholders = entries.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)').join(',');
const values = entries.flatMap(entry => [
entry.log_id,
entry.source,
entry.level,
entry.category,
entry.message,
entry.metadata,
entry.user_id,
entry.ip_address,
entry.user_agent,
entry.signature,
entry.timestamp
]);
return new Promise((resolve, reject) => {
db.run(
`INSERT INTO aggregated_logs
(log_id, source, level, category, message, metadata, user_id, ip_address, user_agent, signature, timestamp)
VALUES ${placeholders}`,
values,
(err) => {
if (err) reject(err);
else resolve();
}
);
});
}
/**
* Query aggregated logs
*/
async query(filters = {}) {
const {
source,
level,
category,
userId,
startDate,
endDate,
limit = 1000,
offset = 0,
orderBy = 'timestamp',
order = 'DESC'
} = filters;
let whereClause = [];
let params = [];
if (source) {
whereClause.push('source = ?');
params.push(source);
}
if (level) {
if (Array.isArray(level)) {
whereClause.push(`level IN (${level.map(() => '?').join(',')})`);
params.push(...level);
} else {
whereClause.push('level = ?');
params.push(level);
}
}
if (category) {
whereClause.push('category = ?');
params.push(category);
}
if (userId) {
whereClause.push('user_id = ?');
params.push(userId);
}
if (startDate) {
whereClause.push('timestamp >= ?');
params.push(startDate);
}
if (endDate) {
whereClause.push('timestamp <= ?');
params.push(endDate);
}
const where = whereClause.length > 0 ? `WHERE ${whereClause.join(' AND ')}` : '';
params.push(limit, offset);
return new Promise((resolve, reject) => {
db.all(
`SELECT * FROM aggregated_logs ${where}
ORDER BY ${orderBy} ${order}
LIMIT ? OFFSET ?`,
params,
(err, rows) => {
if (err) reject(err);
else resolve(rows);
}
);
});
}
/**
* Get log statistics
*/
async getStatistics(timeRange = 24) {
const startTime = new Date(Date.now() - timeRange * 60 * 60 * 1000).toISOString();
return new Promise((resolve, reject) => {
db.all(
`SELECT
source,
level,
category,
COUNT(*) as count,
MIN(timestamp) as first_seen,
MAX(timestamp) as last_seen
FROM aggregated_logs
WHERE timestamp >= ?
GROUP BY source, level, category
ORDER BY count DESC`,
[startTime],
(err, rows) => {
if (err) reject(err);
else {
const stats = {
timeRange: `${timeRange} hours`,
totalLogs: rows.reduce((sum, row) => sum + row.count, 0),
bySource: {},
byLevel: {},
byCategory: {},
breakdown: rows
};
rows.forEach(row => {
// By source
if (!stats.bySource[row.source]) stats.bySource[row.source] = 0;
stats.bySource[row.source] += row.count;
// By level
if (!stats.byLevel[row.level]) stats.byLevel[row.level] = 0;
stats.byLevel[row.level] += row.count;
// By category
if (!stats.byCategory[row.category]) stats.byCategory[row.category] = 0;
stats.byCategory[row.category] += row.count;
});
resolve(stats);
}
}
);
});
}
/**
* Verify log integrity
* Checks if log entries have been tampered with
*/
async verifyIntegrity(logIds = null) {
const query = logIds
? `SELECT * FROM aggregated_logs WHERE log_id IN (${logIds.map(() => '?').join(',')})`
: `SELECT * FROM aggregated_logs ORDER BY timestamp DESC LIMIT 1000`;
const params = logIds || [];
return new Promise((resolve, reject) => {
db.all(query, params, (err, rows) => {
if (err) {
reject(err);
return;
}
const results = {
total: rows.length,
verified: 0,
tampered: 0,
tamperedLogs: []
};
rows.forEach(row => {
if (this.verifySignature(row)) {
results.verified++;
} else {
results.tampered++;
results.tamperedLogs.push({
log_id: row.log_id,
timestamp: row.timestamp,
source: row.source
});
}
});
resolve(results);
});
});
}
/**
* Cleanup old logs based on retention policy
*/
async cleanup(retentionDays = 90) {
const cutoffDate = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString();
return new Promise((resolve, reject) => {
db.run(
'DELETE FROM aggregated_logs WHERE timestamp < ?',
[cutoffDate],
function(err) {
if (err) reject(err);
else {
logger.info(`[LogAggregator] Cleaned up ${this.changes} old log entries (retention: ${retentionDays} days)`);
resolve(this.changes);
}
}
);
});
}
/**
* Export logs to external SIEM system
*/
async export(filters = {}, format = 'json') {
const logs = await this.query({ ...filters, limit: 10000 });
if (format === 'json') {
return JSON.stringify(logs, null, 2);
} else if (format === 'csv') {
const headers = ['log_id', 'source', 'level', 'category', 'message', 'timestamp', 'ip_address', 'user_id'];
const csv = [headers.join(',')];
logs.forEach(log => {
const row = headers.map(header => {
const value = log[header] || '';
return `"${String(value).replace(/"/g, '""')}"`;
});
csv.push(row.join(','));
});
return csv.join('\n');
}
throw new Error(`Unsupported export format: ${format}`);
}
}
// Create singleton instance
const logAggregator = new LogAggregator();
// Register default sources
logAggregator.registerSource('authentication', { priority: 'critical', retention: 365 });
logAggregator.registerSource('authorization', { priority: 'high', retention: 365 });
logAggregator.registerSource('security_audit', { priority: 'critical', retention: 365 });
logAggregator.registerSource('application', { priority: 'medium', retention: 90 });
logAggregator.registerSource('system', { priority: 'high', retention: 180 });
logAggregator.registerSource('access', { priority: 'low', retention: 30 });
module.exports = logAggregator;