Initial commit: StreamFlow IPTV platform
This commit is contained in:
commit
73a8ae9ffd
1240 changed files with 278451 additions and 0 deletions
444
backend/utils/logAggregator.js
Normal file
444
backend/utils/logAggregator.js
Normal file
|
|
@ -0,0 +1,444 @@
|
|||
/**
|
||||
* Centralized Log Aggregation System (SIEM)
|
||||
* Consolidates logs from multiple sources into a protected repository
|
||||
* Provides holistic visibility across the infrastructure
|
||||
*/
|
||||
|
||||
const logger = require('./logger');
|
||||
const { db } = require('../database/db');
|
||||
const crypto = require('crypto');
|
||||
const fs = require('fs').promises;
|
||||
const path = require('path');
|
||||
|
||||
class LogAggregator {
|
||||
constructor() {
|
||||
this.logSources = new Map();
|
||||
this.aggregationBuffer = [];
|
||||
this.bufferSize = 100; // Batch size for bulk insert
|
||||
this.flushInterval = 5000; // 5 seconds
|
||||
this.initializeAggregation();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize aggregation system
|
||||
*/
|
||||
async initializeAggregation() {
|
||||
// Create aggregated_logs table if not exists
|
||||
await this.createAggregatedLogsTable();
|
||||
|
||||
// Start periodic flush
|
||||
setInterval(() => this.flushBuffer(), this.flushInterval);
|
||||
|
||||
logger.info('[LogAggregator] Initialized - SIEM mode active');
|
||||
}
|
||||
|
||||
/**
|
||||
* Create database table for aggregated logs
|
||||
*/
|
||||
async createAggregatedLogsTable() {
|
||||
return new Promise((resolve, reject) => {
|
||||
db.run(`
|
||||
CREATE TABLE IF NOT EXISTS aggregated_logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
log_id TEXT UNIQUE NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
metadata TEXT,
|
||||
user_id INTEGER,
|
||||
ip_address TEXT,
|
||||
user_agent TEXT,
|
||||
signature TEXT NOT NULL,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
`, (err) => {
|
||||
if (err) {
|
||||
logger.error('[LogAggregator] Failed to create aggregated_logs table:', err);
|
||||
reject(err);
|
||||
} else {
|
||||
// Create indexes for fast querying
|
||||
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_source ON aggregated_logs(source, timestamp DESC)`);
|
||||
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_level ON aggregated_logs(level, timestamp DESC)`);
|
||||
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_category ON aggregated_logs(category, timestamp DESC)`);
|
||||
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_user ON aggregated_logs(user_id, timestamp DESC)`);
|
||||
db.run(`CREATE INDEX IF NOT EXISTS idx_aggregated_logs_timestamp ON aggregated_logs(timestamp DESC)`);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a log source
|
||||
* @param {String} sourceName - Name of the log source
|
||||
* @param {Object} config - Source configuration
|
||||
*/
|
||||
registerSource(sourceName, config = {}) {
|
||||
this.logSources.set(sourceName, {
|
||||
name: sourceName,
|
||||
enabled: config.enabled !== false,
|
||||
priority: config.priority || 'medium',
|
||||
retention: config.retention || 90, // days
|
||||
...config
|
||||
});
|
||||
|
||||
logger.info(`[LogAggregator] Registered source: ${sourceName}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate log entry with cryptographic signature
|
||||
* @param {String} source - Log source identifier
|
||||
* @param {String} level - Log level (info, warn, error, critical)
|
||||
* @param {String} category - Log category (auth, access, security, system, application)
|
||||
* @param {String} message - Log message
|
||||
* @param {Object} details - Additional details
|
||||
*/
|
||||
async aggregate(source, level, category, message, details = {}) {
|
||||
const logId = this.generateLogId();
|
||||
const timestamp = new Date().toISOString();
|
||||
|
||||
const logEntry = {
|
||||
log_id: logId,
|
||||
source,
|
||||
level,
|
||||
category,
|
||||
message,
|
||||
metadata: JSON.stringify({
|
||||
...details,
|
||||
aggregatedAt: timestamp
|
||||
}),
|
||||
user_id: details.userId || null,
|
||||
ip_address: details.ip || null,
|
||||
user_agent: details.userAgent || null,
|
||||
timestamp
|
||||
};
|
||||
|
||||
// Generate cryptographic signature for log integrity
|
||||
logEntry.signature = this.generateSignature(logEntry);
|
||||
|
||||
// Add to buffer
|
||||
this.aggregationBuffer.push(logEntry);
|
||||
|
||||
// Flush if buffer is full
|
||||
if (this.aggregationBuffer.length >= this.bufferSize) {
|
||||
await this.flushBuffer();
|
||||
}
|
||||
|
||||
return logId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate unique log ID
|
||||
*/
|
||||
generateLogId() {
|
||||
const timestamp = Date.now();
|
||||
const random = crypto.randomBytes(8).toString('hex');
|
||||
return `LOG-${timestamp}-${random}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate cryptographic signature for log entry
|
||||
* SHA-256 HMAC with secret key for integrity verification
|
||||
*/
|
||||
generateSignature(logEntry) {
|
||||
const secret = process.env.LOG_SIGNATURE_SECRET || 'default-secret-change-in-production';
|
||||
const data = `${logEntry.log_id}|${logEntry.source}|${logEntry.level}|${logEntry.category}|${logEntry.message}|${logEntry.timestamp}`;
|
||||
|
||||
return crypto
|
||||
.createHmac('sha256', secret)
|
||||
.update(data)
|
||||
.digest('hex');
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify log entry signature
|
||||
*/
|
||||
verifySignature(logEntry) {
|
||||
const expectedSignature = this.generateSignature(logEntry);
|
||||
return logEntry.signature === expectedSignature;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush aggregation buffer to database
|
||||
*/
|
||||
async flushBuffer() {
|
||||
if (this.aggregationBuffer.length === 0) return;
|
||||
|
||||
const batch = [...this.aggregationBuffer];
|
||||
this.aggregationBuffer = [];
|
||||
|
||||
try {
|
||||
await this.bulkInsert(batch);
|
||||
logger.debug(`[LogAggregator] Flushed ${batch.length} log entries`);
|
||||
} catch (error) {
|
||||
logger.error('[LogAggregator] Failed to flush buffer:', error);
|
||||
// Re-add failed entries to buffer
|
||||
this.aggregationBuffer.unshift(...batch);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bulk insert log entries
|
||||
*/
|
||||
async bulkInsert(entries) {
|
||||
if (entries.length === 0) return;
|
||||
|
||||
const placeholders = entries.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)').join(',');
|
||||
const values = entries.flatMap(entry => [
|
||||
entry.log_id,
|
||||
entry.source,
|
||||
entry.level,
|
||||
entry.category,
|
||||
entry.message,
|
||||
entry.metadata,
|
||||
entry.user_id,
|
||||
entry.ip_address,
|
||||
entry.user_agent,
|
||||
entry.signature,
|
||||
entry.timestamp
|
||||
]);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
db.run(
|
||||
`INSERT INTO aggregated_logs
|
||||
(log_id, source, level, category, message, metadata, user_id, ip_address, user_agent, signature, timestamp)
|
||||
VALUES ${placeholders}`,
|
||||
values,
|
||||
(err) => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Query aggregated logs
|
||||
*/
|
||||
async query(filters = {}) {
|
||||
const {
|
||||
source,
|
||||
level,
|
||||
category,
|
||||
userId,
|
||||
startDate,
|
||||
endDate,
|
||||
limit = 1000,
|
||||
offset = 0,
|
||||
orderBy = 'timestamp',
|
||||
order = 'DESC'
|
||||
} = filters;
|
||||
|
||||
let whereClause = [];
|
||||
let params = [];
|
||||
|
||||
if (source) {
|
||||
whereClause.push('source = ?');
|
||||
params.push(source);
|
||||
}
|
||||
|
||||
if (level) {
|
||||
if (Array.isArray(level)) {
|
||||
whereClause.push(`level IN (${level.map(() => '?').join(',')})`);
|
||||
params.push(...level);
|
||||
} else {
|
||||
whereClause.push('level = ?');
|
||||
params.push(level);
|
||||
}
|
||||
}
|
||||
|
||||
if (category) {
|
||||
whereClause.push('category = ?');
|
||||
params.push(category);
|
||||
}
|
||||
|
||||
if (userId) {
|
||||
whereClause.push('user_id = ?');
|
||||
params.push(userId);
|
||||
}
|
||||
|
||||
if (startDate) {
|
||||
whereClause.push('timestamp >= ?');
|
||||
params.push(startDate);
|
||||
}
|
||||
|
||||
if (endDate) {
|
||||
whereClause.push('timestamp <= ?');
|
||||
params.push(endDate);
|
||||
}
|
||||
|
||||
const where = whereClause.length > 0 ? `WHERE ${whereClause.join(' AND ')}` : '';
|
||||
params.push(limit, offset);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
db.all(
|
||||
`SELECT * FROM aggregated_logs ${where}
|
||||
ORDER BY ${orderBy} ${order}
|
||||
LIMIT ? OFFSET ?`,
|
||||
params,
|
||||
(err, rows) => {
|
||||
if (err) reject(err);
|
||||
else resolve(rows);
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get log statistics
|
||||
*/
|
||||
async getStatistics(timeRange = 24) {
|
||||
const startTime = new Date(Date.now() - timeRange * 60 * 60 * 1000).toISOString();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
db.all(
|
||||
`SELECT
|
||||
source,
|
||||
level,
|
||||
category,
|
||||
COUNT(*) as count,
|
||||
MIN(timestamp) as first_seen,
|
||||
MAX(timestamp) as last_seen
|
||||
FROM aggregated_logs
|
||||
WHERE timestamp >= ?
|
||||
GROUP BY source, level, category
|
||||
ORDER BY count DESC`,
|
||||
[startTime],
|
||||
(err, rows) => {
|
||||
if (err) reject(err);
|
||||
else {
|
||||
const stats = {
|
||||
timeRange: `${timeRange} hours`,
|
||||
totalLogs: rows.reduce((sum, row) => sum + row.count, 0),
|
||||
bySource: {},
|
||||
byLevel: {},
|
||||
byCategory: {},
|
||||
breakdown: rows
|
||||
};
|
||||
|
||||
rows.forEach(row => {
|
||||
// By source
|
||||
if (!stats.bySource[row.source]) stats.bySource[row.source] = 0;
|
||||
stats.bySource[row.source] += row.count;
|
||||
|
||||
// By level
|
||||
if (!stats.byLevel[row.level]) stats.byLevel[row.level] = 0;
|
||||
stats.byLevel[row.level] += row.count;
|
||||
|
||||
// By category
|
||||
if (!stats.byCategory[row.category]) stats.byCategory[row.category] = 0;
|
||||
stats.byCategory[row.category] += row.count;
|
||||
});
|
||||
|
||||
resolve(stats);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify log integrity
|
||||
* Checks if log entries have been tampered with
|
||||
*/
|
||||
async verifyIntegrity(logIds = null) {
|
||||
const query = logIds
|
||||
? `SELECT * FROM aggregated_logs WHERE log_id IN (${logIds.map(() => '?').join(',')})`
|
||||
: `SELECT * FROM aggregated_logs ORDER BY timestamp DESC LIMIT 1000`;
|
||||
|
||||
const params = logIds || [];
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
db.all(query, params, (err, rows) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
const results = {
|
||||
total: rows.length,
|
||||
verified: 0,
|
||||
tampered: 0,
|
||||
tamperedLogs: []
|
||||
};
|
||||
|
||||
rows.forEach(row => {
|
||||
if (this.verifySignature(row)) {
|
||||
results.verified++;
|
||||
} else {
|
||||
results.tampered++;
|
||||
results.tamperedLogs.push({
|
||||
log_id: row.log_id,
|
||||
timestamp: row.timestamp,
|
||||
source: row.source
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
resolve(results);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup old logs based on retention policy
|
||||
*/
|
||||
async cleanup(retentionDays = 90) {
|
||||
const cutoffDate = new Date(Date.now() - retentionDays * 24 * 60 * 60 * 1000).toISOString();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
db.run(
|
||||
'DELETE FROM aggregated_logs WHERE timestamp < ?',
|
||||
[cutoffDate],
|
||||
function(err) {
|
||||
if (err) reject(err);
|
||||
else {
|
||||
logger.info(`[LogAggregator] Cleaned up ${this.changes} old log entries (retention: ${retentionDays} days)`);
|
||||
resolve(this.changes);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Export logs to external SIEM system
|
||||
*/
|
||||
async export(filters = {}, format = 'json') {
|
||||
const logs = await this.query({ ...filters, limit: 10000 });
|
||||
|
||||
if (format === 'json') {
|
||||
return JSON.stringify(logs, null, 2);
|
||||
} else if (format === 'csv') {
|
||||
const headers = ['log_id', 'source', 'level', 'category', 'message', 'timestamp', 'ip_address', 'user_id'];
|
||||
const csv = [headers.join(',')];
|
||||
|
||||
logs.forEach(log => {
|
||||
const row = headers.map(header => {
|
||||
const value = log[header] || '';
|
||||
return `"${String(value).replace(/"/g, '""')}"`;
|
||||
});
|
||||
csv.push(row.join(','));
|
||||
});
|
||||
|
||||
return csv.join('\n');
|
||||
}
|
||||
|
||||
throw new Error(`Unsupported export format: ${format}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Create singleton instance
|
||||
const logAggregator = new LogAggregator();
|
||||
|
||||
// Register default sources
|
||||
logAggregator.registerSource('authentication', { priority: 'critical', retention: 365 });
|
||||
logAggregator.registerSource('authorization', { priority: 'high', retention: 365 });
|
||||
logAggregator.registerSource('security_audit', { priority: 'critical', retention: 365 });
|
||||
logAggregator.registerSource('application', { priority: 'medium', retention: 90 });
|
||||
logAggregator.registerSource('system', { priority: 'high', retention: 180 });
|
||||
logAggregator.registerSource('access', { priority: 'low', retention: 30 });
|
||||
|
||||
module.exports = logAggregator;
|
||||
Loading…
Add table
Add a link
Reference in a new issue