streamflow/backend/utils/alertSystem.js

705 lines
20 KiB
JavaScript
Raw Normal View History

/**
* Real-time Alert System
* Automated notification system for security events
* Supports multiple notification channels and alert rules
*/
const logger = require('./logger');
const logAggregator = require('./logAggregator');
const { db } = require('../database/db');
const EventEmitter = require('events');
const responseProtocolManager = require('./responseProtocolManager');
class AlertSystem extends EventEmitter {
constructor() {
super();
this.activeAlerts = new Map();
this.alertRules = new Map();
this.initialize();
}
/**
* Initialize alert system
*/
async initialize() {
await this.createAlertsTable();
await this.createAlertRulesTable();
await this.loadAlertRules();
logger.info('[AlertSystem] Initialized - Real-time monitoring enabled');
}
/**
* Create alerts table
*/
async createAlertsTable() {
return new Promise((resolve, reject) => {
db.run(`
CREATE TABLE IF NOT EXISTS security_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_id TEXT UNIQUE NOT NULL,
rule_id TEXT,
severity TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT NOT NULL,
affected_entity TEXT,
source_data TEXT,
status TEXT DEFAULT 'active',
acknowledged_at DATETIME,
acknowledged_by INTEGER,
resolved_at DATETIME,
resolved_by INTEGER,
resolution_notes TEXT,
notification_sent INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`, (err) => {
if (err) reject(err);
else {
db.run(`CREATE INDEX IF NOT EXISTS idx_alerts_severity ON security_alerts(severity, created_at DESC)`);
db.run(`CREATE INDEX IF NOT EXISTS idx_alerts_status ON security_alerts(status, created_at DESC)`);
db.run(`CREATE INDEX IF NOT EXISTS idx_alerts_rule ON security_alerts(rule_id)`);
resolve();
}
});
});
}
/**
* Create alert rules table
*/
async createAlertRulesTable() {
return new Promise((resolve, reject) => {
db.run(`
CREATE TABLE IF NOT EXISTS alert_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rule_id TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
description TEXT,
rule_type TEXT NOT NULL,
condition TEXT NOT NULL,
severity TEXT NOT NULL,
enabled INTEGER DEFAULT 1,
notification_channels TEXT,
cooldown_minutes INTEGER DEFAULT 10,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`, async (err) => {
if (err) reject(err);
else {
await this.createDefaultRules();
resolve();
}
});
});
}
/**
* Create default alert rules
*/
async createDefaultRules() {
const defaultRules = [
{
rule_id: 'RULE-BRUTE-FORCE',
name: 'Brute Force Detection',
description: 'Alert on brute force attack patterns',
rule_type: 'anomaly',
condition: JSON.stringify({ anomaly_type: 'brute_force_attack' }),
severity: 'critical',
notification_channels: JSON.stringify(['in_app', 'email']),
cooldown_minutes: 10
},
{
rule_id: 'RULE-PRIVILEGE-ESC',
name: 'Privilege Escalation',
description: 'Alert on privilege escalation attempts',
rule_type: 'anomaly',
condition: JSON.stringify({ anomaly_type: 'privilege_escalation' }),
severity: 'critical',
notification_channels: JSON.stringify(['in_app', 'email']),
cooldown_minutes: 5
},
{
rule_id: 'RULE-DATA-EXFIL',
name: 'Data Exfiltration',
description: 'Alert on potential data exfiltration',
rule_type: 'anomaly',
condition: JSON.stringify({ anomaly_type: 'data_exfiltration' }),
severity: 'high',
notification_channels: JSON.stringify(['in_app', 'email']),
cooldown_minutes: 15
},
{
rule_id: 'RULE-THREAT-CRITICAL',
name: 'Critical Threat Level',
description: 'Alert when threat score exceeds 80',
rule_type: 'threshold',
condition: JSON.stringify({ metric: 'threat_score', operator: '>=', value: 80 }),
severity: 'critical',
notification_channels: JSON.stringify(['in_app', 'email']),
cooldown_minutes: 30
},
{
rule_id: 'RULE-SUSPICIOUS-IP',
name: 'Suspicious IP Activity',
description: 'Alert on suspicious IP behavior',
rule_type: 'anomaly',
condition: JSON.stringify({ anomaly_type: 'suspicious_ip' }),
severity: 'high',
notification_channels: JSON.stringify(['in_app']),
cooldown_minutes: 20
},
{
rule_id: 'RULE-SESSION-ANOMALY',
name: 'Session Anomaly',
description: 'Alert on unusual session patterns',
rule_type: 'anomaly',
condition: JSON.stringify({ anomaly_type: 'session_anomaly' }),
severity: 'medium',
notification_channels: JSON.stringify(['in_app']),
cooldown_minutes: 30
}
];
for (const rule of defaultRules) {
await new Promise((resolve, reject) => {
db.run(
`INSERT OR IGNORE INTO alert_rules
(rule_id, name, description, rule_type, condition, severity, notification_channels, cooldown_minutes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
[
rule.rule_id,
rule.name,
rule.description,
rule.rule_type,
rule.condition,
rule.severity,
rule.notification_channels,
rule.cooldown_minutes
],
(err) => {
if (err) reject(err);
else resolve();
}
);
});
}
logger.info('[AlertSystem] Default alert rules created');
}
/**
* Load alert rules into memory
*/
async loadAlertRules() {
return new Promise((resolve, reject) => {
db.all(
'SELECT * FROM alert_rules WHERE enabled = 1',
[],
(err, rows) => {
if (err) {
reject(err);
return;
}
this.alertRules.clear();
for (const row of rows) {
this.alertRules.set(row.rule_id, {
...row,
condition: JSON.parse(row.condition),
notification_channels: JSON.parse(row.notification_channels)
});
}
logger.info(`[AlertSystem] Loaded ${rows.length} alert rules`);
resolve(rows.length);
}
);
});
}
/**
* Trigger alert based on anomaly
* Enhanced with automated response protocols
*/
async triggerAnomalyAlert(anomaly) {
// Find matching rules
const matchingRules = [];
for (const [ruleId, rule] of this.alertRules.entries()) {
if (rule.rule_type === 'anomaly' && rule.condition.anomaly_type === anomaly.type) {
matchingRules.push(rule);
}
}
if (matchingRules.length === 0) {
return;
}
// Create alerts for matching rules
for (const rule of matchingRules) {
// Check cooldown
if (await this.isInCooldown(rule.rule_id, anomaly.affected_ip || anomaly.affected_user_id)) {
logger.debug(`[AlertSystem] Alert ${rule.rule_id} in cooldown period`);
continue;
}
const alertId = await this.createAlert({
rule_id: rule.rule_id,
severity: rule.severity,
title: rule.name,
description: anomaly.description,
affected_entity: anomaly.affected_user_id || anomaly.affected_ip,
source_data: JSON.stringify({
anomalyId: anomaly.anomaly_id,
type: anomaly.type,
confidence: anomaly.confidence,
patternData: JSON.parse(anomaly.pattern_data || '{}')
})
});
// Send notifications
await this.sendNotifications(alertId, rule.notification_channels);
// Execute automated response protocols (CWE-778)
await this.executeResponseProtocols('anomaly', {
anomaly_type: anomaly.type,
severity: rule.severity
}, {
alertId,
ip_address: anomaly.affected_ip,
user_id: anomaly.affected_user_id,
confidence: anomaly.confidence
});
}
}
/**
* Execute automated response protocols
* CWE-778: Logs all protocol executions
*/
async executeResponseProtocols(triggerType, triggerEvent, context = {}) {
try {
const result = await responseProtocolManager.executeProtocols(triggerType, triggerEvent, context);
if (result.executed) {
logger.warn(`[AlertSystem] Executed ${result.protocols.length} response protocol(s)`);
// Log protocol execution (CWE-778)
logAggregator.aggregate('alert_system', 'warn', 'security', 'Response protocols executed', {
triggerType,
protocolsExecuted: result.protocols.length,
protocols: result.protocols.map(p => ({
protocolId: p.protocolId,
protocolName: p.protocolName,
status: p.status,
actionsExecuted: p.actionsExecuted
})),
context
});
}
return result;
} catch (error) {
logger.error('[AlertSystem] Response protocol execution failed:', error);
// Log execution failure (CWE-778)
logAggregator.aggregate('alert_system', 'error', 'security', 'Response protocol execution failed', {
triggerType,
error: error.message,
context
});
}
}
/**
* Trigger alert based on threshold
*/
async triggerThresholdAlert(metric, value) {
for (const [ruleId, rule] of this.alertRules.entries()) {
if (rule.rule_type !== 'threshold' || rule.condition.metric !== metric) {
continue;
}
// Evaluate condition
const passed = this.evaluateCondition(rule.condition, value);
if (!passed) continue;
// Check cooldown
if (await this.isInCooldown(rule.rule_id, metric)) {
continue;
}
const alertId = await this.createAlert({
rule_id: rule.rule_id,
severity: rule.severity,
title: rule.name,
description: `${rule.description}: ${metric} = ${value}`,
affected_entity: metric,
source_data: JSON.stringify({ metric, value, threshold: rule.condition.value })
});
await this.sendNotifications(alertId, rule.notification_channels);
}
}
/**
* Evaluate threshold condition
*/
evaluateCondition(condition, value) {
const { operator, value: threshold } = condition;
switch (operator) {
case '>=': return value >= threshold;
case '>': return value > threshold;
case '<=': return value <= threshold;
case '<': return value < threshold;
case '==': return value == threshold;
case '!=': return value != threshold;
default: return false;
}
}
/**
* Check if alert is in cooldown period
*/
async isInCooldown(ruleId, affectedEntity) {
const rule = this.alertRules.get(ruleId);
if (!rule) return false;
const cooldownMinutes = rule.cooldown_minutes || 10;
const cooldownTime = new Date(Date.now() - cooldownMinutes * 60 * 1000).toISOString();
return new Promise((resolve, reject) => {
db.get(
`SELECT COUNT(*) as count FROM security_alerts
WHERE rule_id = ?
AND affected_entity = ?
AND created_at >= ?`,
[ruleId, affectedEntity, cooldownTime],
(err, row) => {
if (err) reject(err);
else resolve(row.count > 0);
}
);
});
}
/**
* Create alert
*/
async createAlert(details) {
const alertId = `ALERT-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
return new Promise((resolve, reject) => {
db.run(
`INSERT INTO security_alerts
(alert_id, rule_id, severity, title, description, affected_entity, source_data)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
[
alertId,
details.rule_id,
details.severity,
details.title,
details.description,
details.affected_entity,
details.source_data
],
(err) => {
if (err) {
reject(err);
} else {
logger.warn(`[AlertSystem] Alert triggered: ${alertId} - ${details.title} (${details.severity})`);
// Log to aggregated logs
logAggregator.aggregate('alert_system', 'warn', 'security', details.title, {
alertId,
ruleId: details.rule_id,
severity: details.severity,
affectedEntity: details.affected_entity
});
// Emit event for real-time notifications
this.emit('alert', {
alertId,
...details,
timestamp: new Date().toISOString()
});
this.activeAlerts.set(alertId, details);
resolve(alertId);
}
}
);
});
}
/**
* Send notifications through configured channels
*/
async sendNotifications(alertId, channels) {
for (const channel of channels) {
try {
switch (channel) {
case 'in_app':
await this.sendInAppNotification(alertId);
break;
case 'email':
await this.sendEmailNotification(alertId);
break;
case 'webhook':
await this.sendWebhookNotification(alertId);
break;
default:
logger.debug(`[AlertSystem] Unknown notification channel: ${channel}`);
}
} catch (error) {
logger.error(`[AlertSystem] Failed to send ${channel} notification:`, error);
}
}
// Mark notification as sent
await new Promise((resolve) => {
db.run(
'UPDATE security_alerts SET notification_sent = 1 WHERE alert_id = ?',
[alertId],
() => resolve()
);
});
}
/**
* Send in-app notification (emit event)
*/
async sendInAppNotification(alertId) {
const alert = await this.getAlert(alertId);
if (!alert) return;
this.emit('notification', {
type: 'security_alert',
alertId: alert.alert_id,
severity: alert.severity,
title: alert.title,
description: alert.description,
timestamp: alert.created_at
});
logger.debug(`[AlertSystem] In-app notification sent: ${alertId}`);
}
/**
* Send email notification (placeholder)
*/
async sendEmailNotification(alertId) {
const alert = await this.getAlert(alertId);
if (!alert) return;
// TODO: Implement email sending (nodemailer)
logger.info(`[AlertSystem] Email notification (stub): ${alertId}`);
// For now, just log it
logger.info(`[AlertSystem] Email would be sent for alert ${alertId}: ${alert.title}`);
}
/**
* Send webhook notification (placeholder)
*/
async sendWebhookNotification(alertId) {
const alert = await this.getAlert(alertId);
if (!alert) return;
// TODO: Implement webhook HTTP POST
logger.info(`[AlertSystem] Webhook notification (stub): ${alertId}`);
}
/**
* Get alert by ID
*/
async getAlert(alertId) {
return new Promise((resolve, reject) => {
db.get(
'SELECT * FROM security_alerts WHERE alert_id = ?',
[alertId],
(err, row) => {
if (err) reject(err);
else resolve(row);
}
);
});
}
/**
* Get active alerts
*/
async getAlerts(filters = {}) {
const {
status = 'active',
severity,
limit = 100,
offset = 0
} = filters;
let whereClause = ['status = ?'];
let params = [status];
if (severity) {
whereClause.push('severity = ?');
params.push(severity);
}
params.push(limit, offset);
return new Promise((resolve, reject) => {
db.all(
`SELECT * FROM security_alerts
WHERE ${whereClause.join(' AND ')}
ORDER BY created_at DESC
LIMIT ? OFFSET ?`,
params,
(err, rows) => {
if (err) reject(err);
else resolve(rows);
}
);
});
}
/**
* Acknowledge alert
*/
async acknowledgeAlert(alertId, userId, notes = '') {
return new Promise((resolve, reject) => {
db.run(
`UPDATE security_alerts
SET status = 'acknowledged',
acknowledged_at = CURRENT_TIMESTAMP,
acknowledged_by = ?
WHERE alert_id = ?`,
[userId, alertId],
(err) => {
if (err) {
reject(err);
} else {
logger.info(`[AlertSystem] Alert acknowledged: ${alertId} by user ${userId}`);
logAggregator.aggregate('alert_system', 'info', 'security', 'Alert acknowledged', {
alertId,
userId,
notes
});
this.emit('alert_acknowledged', { alertId, userId });
resolve();
}
}
);
});
}
/**
* Resolve alert
*/
async resolveAlert(alertId, userId, notes) {
return new Promise((resolve, reject) => {
db.run(
`UPDATE security_alerts
SET status = 'resolved',
resolved_at = CURRENT_TIMESTAMP,
resolved_by = ?,
resolution_notes = ?
WHERE alert_id = ?`,
[userId, notes, alertId],
(err) => {
if (err) {
reject(err);
} else {
logger.info(`[AlertSystem] Alert resolved: ${alertId} by user ${userId}`);
logAggregator.aggregate('alert_system', 'info', 'security', 'Alert resolved', {
alertId,
userId,
notes
});
this.activeAlerts.delete(alertId);
this.emit('alert_resolved', { alertId, userId });
resolve();
}
}
);
});
}
/**
* Get alert statistics
*/
async getStatistics() {
return new Promise((resolve, reject) => {
db.all(
`SELECT
status,
severity,
COUNT(*) as count
FROM security_alerts
WHERE created_at >= datetime('now', '-24 hours')
GROUP BY status, severity`,
[],
(err, rows) => {
if (err) {
reject(err);
return;
}
const stats = {
total: 0,
byStatus: {},
bySeverity: {}
};
for (const row of rows) {
stats.total += row.count;
if (!stats.byStatus[row.status]) {
stats.byStatus[row.status] = 0;
}
stats.byStatus[row.status] += row.count;
if (!stats.bySeverity[row.severity]) {
stats.bySeverity[row.severity] = 0;
}
stats.bySeverity[row.severity] += row.count;
}
resolve(stats);
}
);
});
}
}
// Create singleton instance
const alertSystem = new AlertSystem();
// Connect security intelligence to alert system
const securityIntelligence = require('./securityIntelligence');
// Listen for anomalies and trigger alerts
setInterval(async () => {
try {
const anomalies = await securityIntelligence.getAnomalies({ status: 'open', limit: 50 });
for (const anomaly of anomalies) {
await alertSystem.triggerAnomalyAlert(anomaly);
}
// Check threat score
const threatScore = securityIntelligence.threatScore;
if (threatScore >= 80) {
await alertSystem.triggerThresholdAlert('threat_score', threatScore);
}
} catch (error) {
logger.error('[AlertSystem] Error checking for alerts:', error);
}
}, 60000); // Check every minute
module.exports = alertSystem;