DevSecOps Security Metrics and Reporting
Overview
DevSecOps security metrics and reporting provide the visibility and accountability necessary to maintain and improve security posture in continuous delivery environments. This article explores how to establish meaningful security metrics, create effective reporting mechanisms, and build dashboards that enable data-driven security decisions.
Security Metrics Framework
Defining Security Metrics
Key Security Indicators
Security metrics in DevSecOps environments must balance security effectiveness with development velocity:
# Example: Security metrics framework
import time
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import statistics
from dataclasses import dataclass
from enum import Enum
class SecurityMetricType(Enum):
VULNERABILITY = "vulnerability"
INCIDENT = "incident"
COMPLIANCE = "compliance"
PROCESS = "process"
PERFORMANCE = "performance"
@dataclass
class SecurityMetric:
name: str
value: float
unit: str
timestamp: datetime
metric_type: SecurityMetricType
tags: Dict[str, str] = None
description: str = ""
class SecurityMetricsCollector:
"""
Collect and aggregate security metrics
"""
def __init__(self):
self.metrics = []
self.metrics_store = {}
self.alert_thresholds = self.define_alert_thresholds()
self.kpi_definitions = self.define_kpi_definitions()
def define_alert_thresholds(self) -> Dict[str, Dict[str, float]]:
"""
Define alert thresholds for security metrics
"""
return {
'vulnerability_count': {
'critical': 0,
'high': 5,
'medium': 20,
'low': 50
},
'mean_time_to_remediate': {
'critical': 24, # hours
'high': 72, # hours
'medium': 168 # hours (1 week)
},
'security_incidents': {
'daily_threshold': 5,
'weekly_threshold': 20
},
'compliance_score': {
'minimum_acceptable': 95.0 # percentage
}
}
def define_kpi_definitions(self) -> Dict[str, Dict[str, Any]]:
"""
Define Key Performance Indicators for security
"""
return {
'security_velocity': {
'name': 'Security Velocity',
'description': 'Rate of security improvements vs new vulnerabilities',
'formula': '(vulnerabilities_fixed / vulnerabilities_found) * 100',
'target': 100.0,
'unit': 'percentage'
},
'mean_time_to_detect': {
'name': 'Mean Time to Detect (MTTD)',
'description': 'Average time to detect security incidents',
'formula': 'average(time_from_occurrence_to_detection)',
'target': 1.0, # hours
'unit': 'hours'
},
'mean_time_to_remediate': {
'name': 'Mean Time to Remediate (MTTR)',
'description': 'Average time to fix security vulnerabilities',
'formula': 'average(time_from_detection_to_remediation)',
'target': 24.0, # hours for critical
'unit': 'hours'
},
'security_coverage': {
'name': 'Security Test Coverage',
'description': 'Percentage of code covered by security tests',
'formula': '(security_tested_lines / total_code_lines) * 100',
'target': 80.0,
'unit': 'percentage'
},
'compliance_score': {
'name': 'Compliance Score',
'description': 'Overall compliance with security standards',
'formula': '(compliant_items / total_items) * 100',
'target': 95.0,
'unit': 'percentage'
},
'security_awareness': {
'name': 'Security Awareness',
'description': 'Team security training completion rate',
'formula': '(trained_team_members / total_team_members) * 100',
'target': 100.0,
'unit': 'percentage'
}
}
def collect_vulnerability_metrics(self, scan_results: Dict[str, Any]) -> List[SecurityMetric]:
"""
Collect vulnerability-related metrics
"""
metrics = []
# Count vulnerabilities by severity
vuln_counts = scan_results.get('vulnerability_summary', {})
for severity, count in vuln_counts.items():
metrics.append(SecurityMetric(
name=f'vulnerability_count_{severity}',
value=count,
unit='count',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.VULNERABILITY,
tags={'severity': severity},
description=f'Number of {severity} severity vulnerabilities found'
))
# Calculate vulnerability density
total_vulns = sum(vuln_counts.values())
code_lines = scan_results.get('code_lines_scanned', 100000) # Default to 100k lines
vuln_density = (total_vulns / code_lines) * 1000 # per 1000 lines of code
metrics.append(SecurityMetric(
name='vulnerability_density',
value=vuln_density,
unit='vulnerabilities_per_kloc',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.VULNERABILITY,
description='Vulnerabilities per 1000 lines of code'
))
return metrics
def collect_incident_metrics(self, incident_data: Dict[str, Any]) -> List[SecurityMetric]:
"""
Collect incident-related metrics
"""
metrics = []
# Count incidents by severity
severity_counts = {}
for incident in incident_data.get('incidents', []):
severity = incident.get('severity', 'unknown')
severity_counts[severity] = severity_counts.get(severity, 0) + 1
for severity, count in severity_counts.items():
metrics.append(SecurityMetric(
name=f'incidents_count_{severity}',
value=count,
unit='count',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.INCIDENT,
tags={'severity': severity},
description=f'Number of {severity} severity incidents'
))
# Calculate mean time to detect and remediate
if incident_data.get('incidents'):
detection_times = []
remediation_times = []
for incident in incident_data['incidents']:
detected_at = datetime.fromisoformat(incident['detected_at'])
occurred_at = datetime.fromisoformat(incident['occurred_at'])
resolved_at = datetime.fromisoformat(incident['resolved_at']) if incident.get('resolved_at') else datetime.utcnow()
detection_time = (detected_at - occurred_at).total_seconds() / 3600 # hours
remediation_time = (resolved_at - detected_at).total_seconds() / 3600 # hours
detection_times.append(detection_time)
remediation_times.append(remediation_time)
if detection_times:
avg_detection_time = statistics.mean(detection_times)
metrics.append(SecurityMetric(
name='mean_time_to_detect',
value=avg_detection_time,
unit='hours',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.PERFORMANCE,
description='Average time to detect security incidents'
))
if remediation_times:
avg_remediation_time = statistics.mean(remediation_times)
metrics.append(SecurityMetric(
name='mean_time_to_remediate',
value=avg_remediation_time,
unit='hours',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.PERFORMANCE,
description='Average time to remediate security incidents'
))
return metrics
def collect_compliance_metrics(self, compliance_data: Dict[str, Any]) -> List[SecurityMetric]:
"""
Collect compliance-related metrics
"""
metrics = []
# Calculate compliance score
compliant_items = compliance_data.get('compliant_items', 0)
total_items = compliance_data.get('total_items', 1)
compliance_score = (compliant_items / total_items) * 100 if total_items > 0 else 0
metrics.append(SecurityMetric(
name='compliance_score',
value=compliance_score,
unit='percentage',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.COMPLIANCE,
description='Overall compliance with security standards'
))
# Count compliance violations by type
violations_by_type = compliance_data.get('violations_by_type', {})
for violation_type, count in violations_by_type.items():
metrics.append(SecurityMetric(
name=f'compliance_violations_{violation_type}',
value=count,
unit='count',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.COMPLIANCE,
tags={'violation_type': violation_type},
description=f'Number of {violation_type} compliance violations'
))
return metrics
def collect_process_metrics(self, process_data: Dict[str, Any]) -> List[SecurityMetric]:
"""
Collect security process metrics
"""
metrics = []
# Security test coverage
security_tested_lines = process_data.get('security_tested_lines', 0)
total_lines = process_data.get('total_lines', 1)
coverage_percentage = (security_tested_lines / total_lines) * 100 if total_lines > 0 else 0
metrics.append(SecurityMetric(
name='security_test_coverage',
value=coverage_percentage,
unit='percentage',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.PROCESS,
description='Percentage of code covered by security tests'
))
# Security training completion
trained_members = process_data.get('trained_team_members', 0)
total_members = process_data.get('total_team_members', 1)
training_completion = (trained_members / total_members) * 100 if total_members > 0 else 0
metrics.append(SecurityMetric(
name='security_training_completion',
value=training_completion,
unit='percentage',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.PROCESS,
description='Team security training completion rate'
))
# Mean time to security review
review_times = process_data.get('security_review_times', [])
if review_times:
avg_review_time = statistics.mean(review_times)
metrics.append(SecurityMetric(
name='mean_time_to_security_review',
value=avg_review_time,
unit='hours',
timestamp=datetime.utcnow(),
metric_type=SecurityMetricType.PROCESS,
description='Average time to complete security reviews'
))
return metrics
def calculate_kpi(self, kpi_name: str, data: Dict[str, Any]) -> float:
"""
Calculate a specific KPI value
"""
kpi_def = self.kpi_definitions.get(kpi_name)
if not kpi_def:
raise ValueError(f"KPI {kpi_name} not defined")
if kpi_name == 'security_velocity':
vulnerabilities_found = data.get('vulnerabilities_found', 1)
vulnerabilities_fixed = data.get('vulnerabilities_fixed', 0)
return (vulnerabilities_fixed / vulnerabilities_found) * 100
elif kpi_name == 'mean_time_to_detect':
detection_times = data.get('detection_times', [])
return statistics.mean(detection_times) if detection_times else 0
elif kpi_name == 'mean_time_to_remediate':
remediation_times = data.get('remediation_times', [])
return statistics.mean(remediation_times) if remediation_times else 0
elif kpi_name == 'security_coverage':
security_tested_lines = data.get('security_tested_lines', 0)
total_lines = data.get('total_lines', 1)
return (security_tested_lines / total_lines) * 100 if total_lines > 0 else 0
elif kpi_name == 'compliance_score':
compliant_items = data.get('compliant_items', 0)
total_items = data.get('total_items', 1)
return (compliant_items / total_items) * 100 if total_items > 0 else 0
elif kpi_name == 'security_awareness':
trained_members = data.get('trained_team_members', 0)
total_members = data.get('total_team_members', 1)
return (trained_members / total_members) * 100 if total_members > 0 else 0
return 0.0
def generate_security_report(self, time_range: str = '24h') -> Dict[str, Any]:
"""
Generate comprehensive security report
"""
# Calculate time range
if time_range == '24h':
start_time = datetime.utcnow() - timedelta(hours=24)
elif time_range == '7d':
start_time = datetime.utcnow() - timedelta(days=7)
elif time_range == '30d':
start_time = datetime.utcnow() - timedelta(days=30)
else:
start_time = datetime.utcnow() - timedelta(days=1)
# Aggregate metrics within time range
relevant_metrics = [m for m in self.metrics if m.timestamp >= start_time]
# Group metrics by type
metrics_by_type = {}
for metric in relevant_metrics:
metric_type = metric.metric_type.value
if metric_type not in metrics_by_type:
metrics_by_type[metric_type] = []
metrics_by_type[metric_type].append(metric)
# Calculate KPIs
kpi_data = {}
for kpi_name in self.kpi_definitions.keys():
try:
kpi_value = self.calculate_kpi(kpi_name, {})
kpi_data[kpi_name] = {
'value': kpi_value,
'target': self.kpi_definitions[kpi_name]['target'],
'status': 'met' if kpi_value >= self.kpi_definitions[kpi_name]['target'] else 'not_met'
}
except Exception as e:
kpi_data[kpi_name] = {
'value': 0,
'target': self.kpi_definitions[kpi_name]['target'],
'status': 'error',
'error': str(e)
}
report = {
'report_timestamp': datetime.utcnow().isoformat(),
'time_range': time_range,
'start_time': start_time.isoformat(),
'end_time': datetime.utcnow().isoformat(),
'metrics_by_type': {
metric_type: [
{
'name': m.name,
'value': m.value,
'unit': m.unit,
'timestamp': m.timestamp.isoformat(),
'tags': m.tags
}
for m in metrics
]
for metric_type, metrics in metrics_by_type.items()
},
'kpis': kpi_data,
'trending': self.calculate_trends(relevant_metrics),
'alerts': self.check_alerts(relevant_metrics),
'summary': self.generate_summary(metrics_by_type, kpi_data)
}
return report
def calculate_trends(self, metrics: List[SecurityMetric]) -> Dict[str, Any]:
"""
Calculate trends for security metrics
"""
trends = {}
# Group metrics by name
metrics_by_name = {}
for metric in metrics:
if metric.name not in metrics_by_name:
metrics_by_name[metric.name] = []
metrics_by_name[metric.name].append(metric)
# Calculate trends for each metric
for metric_name, metric_list in metrics_by_name.items():
if len(metric_list) < 2:
continue
# Sort by timestamp
sorted_metrics = sorted(metric_list, key=lambda m: m.timestamp)
values = [m.value for m in sorted_metrics]
# Calculate trend (simple linear regression slope approximation)
if len(values) >= 2:
first_value = values[0]
last_value = values[-1]
trend_direction = 'increasing' if last_value > first_value else 'decreasing' if last_value < first_value else 'stable'
trends[metric_name] = {
'direction': trend_direction,
'first_value': first_value,
'last_value': last_value,
'change': last_value - first_value,
'change_percentage': ((last_value - first_value) / first_value * 100) if first_value != 0 else 0
}
return trends
def check_alerts(self, metrics: List[SecurityMetric]) -> List[Dict[str, Any]]:
"""
Check for metrics that exceed alert thresholds
"""
alerts = []
for metric in metrics:
threshold_key = metric.name.replace('_count', '').replace('_score', '').replace('_time', '')
if threshold_key in self.alert_thresholds:
thresholds = self.alert_thresholds[threshold_key]
# Check if metric value exceeds thresholds
for level, threshold_value in thresholds.items():
if metric.value > threshold_value:
alerts.append({
'metric_name': metric.name,
'current_value': metric.value,
'threshold_level': level,
'threshold_value': threshold_value,
'timestamp': metric.timestamp.isoformat(),
'severity': 'high' if level in ['critical', 'daily_threshold'] else 'medium'
})
return alerts
def generate_summary(self, metrics_by_type: Dict[str, List[Dict[str, Any]]], kpis: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate summary of security metrics
"""
summary = {
'total_metrics': sum(len(metrics) for metrics in metrics_by_type.values()),
'kpis_met': len([k for k, v in kpis.items() if v.get('status') == 'met']),
'kpis_total': len(kpis),
'compliance_score': kpis.get('compliance_score', {}).get('value', 0),
'vulnerability_status': self.calculate_vulnerability_summary(metrics_by_type.get('vulnerability', [])),
'incident_status': self.calculate_incident_summary(metrics_by_type.get('incident', [])),
'trend_summary': self.calculate_trend_summary(metrics_by_type)
}
return summary
def calculate_vulnerability_summary(self, vulnerability_metrics: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Calculate vulnerability summary
"""
critical = sum(m['value'] for m in vulnerability_metrics if 'critical' in m['name'])
high = sum(m['value'] for m in vulnerability_metrics if 'high' in m['name'])
medium = sum(m['value'] for m in vulnerability_metrics if 'medium' in m['name'])
low = sum(m['value'] for m in vulnerability_metrics if 'low' in m['name'])
return {
'critical': critical,
'high': high,
'medium': medium,
'low': low,
'total': critical + high + medium + low,
'status': 'high_risk' if critical > 0 or high > 10 else 'medium_risk' if high > 0 or medium > 20 else 'low_risk'
}
def calculate_incident_summary(self, incident_metrics: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Calculate incident summary
"""
critical = sum(m['value'] for m in incident_metrics if 'critical' in m['name'])
high = sum(m['value'] for m in incident_metrics if 'high' in m['name'])
medium = sum(m['value'] for m in incident_metrics if 'medium' in m['name'])
low = sum(m['value'] for m in incident_metrics if 'low' in m['name'])
return {
'critical': critical,
'high': high,
'medium': medium,
'low': low,
'total': critical + high + medium + low,
'status': 'critical' if critical > 0 else 'high' if high > 0 else 'medium' if medium > 0 else 'low'
}
def calculate_trend_summary(self, metrics_by_type: Dict[str, List[Dict[str, Any]]]) -> Dict[str, str]:
"""
Calculate overall trend summary
"""
positive_trends = 0
negative_trends = 0
for metric_list in metrics_by_type.values():
for metric in metric_list:
# This would be calculated based on actual trend data
# For now, we'll use a simple heuristic
if 'density' in metric['name'] or 'count' in metric['name']:
# Lower is better for these metrics
if metric['value'] < 10: # arbitrary threshold
positive_trends += 1
else:
negative_trends += 1
return {
'positive_trends': positive_trends,
'negative_trends': negative_trends,
'overall_trend': 'improving' if positive_trends > negative_trends else 'declining'
}
# Example usage
def run_security_metrics_example():
"""
Example of running security metrics collection
"""
collector = SecurityMetricsCollector()
# Simulate collecting different types of metrics
# Vulnerability metrics
vuln_scan_results = {
'vulnerability_summary': {
'critical': 2,
'high': 8,
'medium': 15,
'low': 25
},
'code_lines_scanned': 50000
}
vuln_metrics = collector.collect_vulnerability_metrics(vuln_scan_results)
for metric in vuln_metrics:
collector.metrics.append(metric)
# Incident metrics
incident_data = {
'incidents': [
{
'severity': 'high',
'occurred_at': (datetime.utcnow() - timedelta(hours=2)).isoformat(),
'detected_at': (datetime.utcnow() - timedelta(hours=1)).isoformat(),
'resolved_at': datetime.utcnow().isoformat()
},
{
'severity': 'medium',
'occurred_at': (datetime.utcnow() - timedelta(hours=10)).isoformat(),
'detected_at': (datetime.utcnow() - timedelta(hours=8)).isoformat(),
'resolved_at': (datetime.utcnow() - timedelta(hours=5)).isoformat()
}
]
}
incident_metrics = collector.collect_incident_metrics(incident_data)
for metric in incident_metrics:
collector.metrics.append(metric)
# Generate report
report = collector.generate_security_report('24h')
print(f"Security Report Summary:")
print(f" Compliance Score: {report['summary']['compliance_score']:.2f}%")
print(f" Vulnerability Status: {report['summary']['vulnerability_status']['status']}")
print(f" Incident Status: {report['summary']['incident_status']['status']}")
print(f" KPIs Met: {report['summary']['kpis_met']}/{report['summary']['kpis_total']}")
return report
# Run example
# report = run_security_metrics_example()Security Scorecards
Creating comprehensive security scorecards:
# Example: Security scorecard system
from dataclasses import dataclass
from typing import Dict, List, Any
import json
from datetime import datetime
@dataclass
class SecurityScorecard:
"""
Security scorecard for teams/projects
"""
team_name: str
project_name: str
scorecard_date: datetime
categories: Dict[str, Dict[str, Any]]
overall_score: float
risk_level: str
recommendations: List[str]
class SecurityScorecardGenerator:
"""
Generate security scorecards for teams and projects
"""
def __init__(self):
self.scorecard_templates = self.define_scorecard_templates()
self.scoring_weights = self.define_scoring_weights()
def define_scorecard_templates(self) -> Dict[str, Dict[str, Any]]:
"""
Define security scorecard templates
"""
return {
'development_team': {
'categories': [
'Secure Coding Practices',
'Security Testing',
'Vulnerability Management',
'Security Awareness',
'Compliance'
],
'max_score': 100
},
'application': {
'categories': [
'Code Security',
'Dependency Security',
'Runtime Security',
'Configuration Security',
'Monitoring & Logging'
],
'max_score': 100
},
'infrastructure': {
'categories': [
'Network Security',
'Identity & Access Management',
'Data Protection',
'Infrastructure Security',
'Compliance'
],
'max_score': 100
}
}
def define_scoring_weights(self) -> Dict[str, Dict[str, float]]:
"""
Define scoring weights for different categories
"""
return {
'development_team': {
'Secure Coding Practices': 0.25,
'Security Testing': 0.25,
'Vulnerability Management': 0.20,
'Security Awareness': 0.20,
'Compliance': 0.10
},
'application': {
'Code Security': 0.30,
'Dependency Security': 0.25,
'Runtime Security': 0.20,
'Configuration Security': 0.15,
'Monitoring & Logging': 0.10
},
'infrastructure': {
'Network Security': 0.25,
'Identity & Access Management': 0.25,
'Data Protection': 0.20,
'Infrastructure Security': 0.20,
'Compliance': 0.10
}
}
def calculate_team_scorecard(self, team_data: Dict[str, Any]) -> SecurityScorecard:
"""
Calculate security scorecard for a development team
"""
categories = {}
# Secure Coding Practices (25%)
secure_coding_score = self.calculate_secure_coding_score(team_data)
categories['Secure Coding Practices'] = {
'score': secure_coding_score,
'weight': 0.25,
'details': team_data.get('secure_coding_practices', {})
}
# Security Testing (25%)
security_testing_score = self.calculate_security_testing_score(team_data)
categories['Security Testing'] = {
'score': security_testing_score,
'weight': 0.25,
'details': team_data.get('security_testing', {})
}
# Vulnerability Management (20%)
vulnerability_mgmt_score = self.calculate_vulnerability_management_score(team_data)
categories['Vulnerability Management'] = {
'score': vulnerability_mgmt_score,
'weight': 0.20,
'details': team_data.get('vulnerability_management', {})
}
# Security Awareness (20%)
security_awareness_score = self.calculate_security_awareness_score(team_data)
categories['Security Awareness'] = {
'score': security_awareness_score,
'weight': 0.20,
'details': team_data.get('security_awareness', {})
}
# Compliance (10%)
compliance_score = self.calculate_compliance_score(team_data)
categories['Compliance'] = {
'score': compliance_score,
'weight': 0.10,
'details': team_data.get('compliance', {})
}
# Calculate overall score
overall_score = sum(
cat_data['score'] * cat_data['weight']
for cat_data in categories.values()
)
# Determine risk level
risk_level = self.determine_risk_level(overall_score)
# Generate recommendations
recommendations = self.generate_recommendations(categories, overall_score)
return SecurityScorecard(
team_name=team_data.get('team_name', 'Unknown'),
project_name=team_data.get('project_name', 'Unknown'),
scorecard_date=datetime.utcnow(),
categories=categories,
overall_score=overall_score,
risk_level=risk_level,
recommendations=recommendations
)
def calculate_secure_coding_score(self, team_data: Dict[str, Any]) -> float:
"""
Calculate secure coding practices score
"""
practices = team_data.get('secure_coding_practices', {})
score = 0
total_possible = 0
# Code review with security focus
if practices.get('security_code_reviews', False):
score += 20
total_possible += 20
# Static analysis tools usage
if practices.get('static_analysis_tools', False):
score += 20
total_possible += 20
# Secure coding training
trained_developers = practices.get('trained_developers', 0)
total_developers = practices.get('total_developers', 1)
training_score = (trained_developers / total_developers) * 20
score += training_score
total_possible += 20
# Security requirements in user stories
if practices.get('security_requirements', False):
score += 20
total_possible += 20
# Threat modeling
if practices.get('threat_modeling', False):
score += 20
total_possible += 20
return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
def calculate_security_testing_score(self, team_data: Dict[str, Any]) -> float:
"""
Calculate security testing score
"""
testing = team_data.get('security_testing', {})
score = 0
total_possible = 0
# Automated security testing in CI/CD
if testing.get('ci_cd_security_tests', False):
score += 25
total_possible += 25
# Dynamic application security testing
if testing.get('dast_scans', False):
score += 20
total_possible += 20
# Interactive application security testing
if testing.get('iast_implementation', False):
score += 15
total_possible += 15
# Penetration testing
if testing.get('penetration_testing', False):
score += 20
total_possible += 20
# Security test coverage
coverage = testing.get('security_test_coverage', 0)
coverage_score = min(20, (coverage / 100) * 20)
score += coverage_score
total_possible += 20
return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
def calculate_vulnerability_management_score(self, team_data: Dict[str, Any]) -> float:
"""
Calculate vulnerability management score
"""
vuln_mgmt = team_data.get('vulnerability_management', {})
score = 0
total_possible = 0
# Time to fix critical vulnerabilities
critical_fix_time = vuln_mgmt.get('mean_time_to_fix_critical', float('inf'))
if critical_fix_time <= 24: # hours
score += 25
elif critical_fix_time <= 72:
score += 15
elif critical_fix_time <= 168: # 1 week
score += 5
total_possible += 25
# Time to fix high vulnerabilities
high_fix_time = vuln_mgmt.get('mean_time_to_fix_high', float('inf'))
if high_fix_time <= 72: # hours
score += 20
elif high_fix_time <= 168: # 1 week
score += 10
total_possible += 20
# Vulnerability remediation rate
remediation_rate = vuln_mgmt.get('remediation_rate', 0)
remediation_score = min(20, (remediation_rate / 100) * 20)
score += remediation_score
total_possible += 20
# Vulnerability tracking system
if vuln_mgmt.get('tracking_system', False):
score += 15
total_possible += 15
# Regular vulnerability assessments
if vuln_mgmt.get('regular_assessments', False):
score += 20
total_possible += 20
return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
def calculate_security_awareness_score(self, team_data: Dict[str, Any]) -> float:
"""
Calculate security awareness score
"""
awareness = team_data.get('security_awareness', {})
score = 0
total_possible = 0
# Security training completion
training_completion = awareness.get('training_completion_rate', 0)
training_score = min(30, (training_completion / 100) * 30)
score += training_score
total_possible += 30
# Security champions program
if awareness.get('security_champions', False):
score += 20
total_possible += 20
# Security awareness campaigns
if awareness.get('awareness_campaigns', False):
score += 15
total_possible += 15
# Security incident reporting
if awareness.get('incident_reporting', False):
score += 20
total_possible += 20
# Security best practices documentation
if awareness.get('best_practices_documentation', False):
score += 15
total_possible += 15
return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
def calculate_compliance_score(self, team_data: Dict[str, Any]) -> float:
"""
Calculate compliance score
"""
compliance = team_data.get('compliance', {})
score = 0
total_possible = 0
# Compliance framework adherence
framework_adherence = compliance.get('framework_adherence', 0)
framework_score = min(30, (framework_adherence / 100) * 30)
score += framework_score
total_possible += 30
# Audit results
audit_score = compliance.get('latest_audit_score', 0)
audit_score_scaled = min(25, (audit_score / 100) * 25)
score += audit_score_scaled
total_possible += 25
# Policy compliance
policy_compliance = compliance.get('policy_compliance_rate', 0)
policy_score = min(20, (policy_compliance / 100) * 20)
score += policy_score
total_possible += 20
# Compliance automation
if compliance.get('compliance_automation', False):
score += 15
total_possible += 15
# Regular compliance reviews
if compliance.get('regular_reviews', False):
score += 10
total_possible += 10
return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
def determine_risk_level(self, overall_score: float) -> str:
"""
Determine risk level based on overall score
"""
if overall_score >= 90:
return 'low'
elif overall_score >= 70:
return 'medium'
elif overall_score >= 50:
return 'high'
else:
return 'critical'
def generate_recommendations(self, categories: Dict[str, Dict[str, Any]], overall_score: float) -> List[str]:
"""
Generate recommendations based on scorecard results
"""
recommendations = []
# Add recommendations for categories with low scores
for category_name, category_data in categories.items():
if category_data['score'] < 70:
recommendations.append(
f"Improve {category_name} (current score: {category_data['score']:.1f})"
)
# Add general recommendations based on overall score
if overall_score < 70:
recommendations.append("Implement comprehensive security training program")
recommendations.append("Increase security testing automation")
recommendations.append("Establish formal vulnerability management process")
if overall_score < 50:
recommendations.append("Conduct immediate security assessment")
recommendations.append("Implement security champions program")
recommendations.append("Review and update security policies")
return recommendations
def generate_scorecard_report(self, scorecard: SecurityScorecard) -> Dict[str, Any]:
"""
Generate detailed scorecard report
"""
report = {
'scorecard_metadata': {
'team_name': scorecard.team_name,
'project_name': scorecard.project_name,
'scorecard_date': scorecard.scorecard_date.isoformat(),
'overall_score': scorecard.overall_score,
'risk_level': scorecard.risk_level
},
'category_scores': {
category_name: {
'score': category_data['score'],
'weight': category_data['weight'],
'details': category_data['details']
}
for category_name, category_data in scorecard.categories.items()
},
'recommendations': scorecard.recommendations,
'benchmark_comparison': self.compare_to_benchmark(scorecard),
'trend_analysis': self.analyze_trends(scorecard)
}
return report
def compare_to_benchmark(self, scorecard: SecurityScorecard) -> Dict[str, Any]:
"""
Compare scorecard to industry benchmarks
"""
# Industry average benchmarks (these would come from real data in practice)
benchmarks = {
'Secure Coding Practices': 75.0,
'Security Testing': 68.0,
'Vulnerability Management': 72.0,
'Security Awareness': 70.0,
'Compliance': 85.0
}
comparison = {}
for category_name, category_data in scorecard.categories.items():
benchmark_score = benchmarks.get(category_name, 70.0)
comparison[category_name] = {
'team_score': category_data['score'],
'benchmark_score': benchmark_score,
'difference': category_data['score'] - benchmark_score,
'performance': 'above' if category_data['score'] > benchmark_score else 'below'
}
return comparison
def analyze_trends(self, scorecard: SecurityScorecard) -> Dict[str, Any]:
"""
Analyze trends for the scorecard
"""
# This would typically compare to previous scorecards
# For this example, we'll provide a mock analysis
return {
'overall_trend': 'improving',
'category_trends': {
category_name: 'stable' for category_name in scorecard.categories.keys()
},
'improvement_areas': [
'Vulnerability Management',
'Security Awareness'
],
'strengths': [
'Compliance'
]
}
# Example usage
def generate_security_scorecard():
"""
Example of generating a security scorecard
"""
generator = SecurityScorecardGenerator()
# Sample team data
team_data = {
'team_name': 'Web Development Team',
'project_name': 'Customer Portal',
'secure_coding_practices': {
'security_code_reviews': True,
'static_analysis_tools': True,
'trained_developers': 8,
'total_developers': 10,
'security_requirements': True,
'threat_modeling': False
},
'security_testing': {
'ci_cd_security_tests': True,
'dast_scans': True,
'iast_implementation': False,
'penetration_testing': True,
'security_test_coverage': 65
},
'vulnerability_management': {
'mean_time_to_fix_critical': 18,
'mean_time_to_fix_high': 45,
'remediation_rate': 85,
'tracking_system': True,
'regular_assessments': True
},
'security_awareness': {
'training_completion_rate': 90,
'security_champions': True,
'awareness_campaigns': True,
'incident_reporting': True,
'best_practices_documentation': True
},
'compliance': {
'framework_adherence': 95,
'latest_audit_score': 88,
'policy_compliance_rate': 92,
'compliance_automation': True,
'regular_reviews': True
}
}
# Generate scorecard
scorecard = generator.calculate_team_scorecard(team_data)
# Generate detailed report
report = generator.generate_scorecard_report(scorecard)
print(f"Security Scorecard for {scorecard.team_name}")
print(f"Overall Score: {scorecard.overall_score:.2f}")
print(f"Risk Level: {scorecard.risk_level}")
print(f"Recommendations: {len(scorecard.recommendations)}")
for i, rec in enumerate(scorecard.recommendations, 1):
print(f" {i}. {rec}")
return report
# Run example
# report = generate_security_scorecard()Security Dashboards
Real-time Security Dashboards
Dashboard Data Collection
# Example: Security dashboard data collector
import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import time
import threading
from dataclasses import dataclass
@dataclass
class DashboardMetric:
name: str
value: float
timestamp: datetime
source: str
tags: Dict[str, str] = None
class SecurityDashboardCollector:
"""
Collect real-time security metrics for dashboard
"""
def __init__(self):
self.metrics = []
self.dashboard_data = {}
self.data_sources = self.configure_data_sources()
self.is_collecting = False
self.collection_thread = None
def configure_data_sources(self) -> Dict[str, Dict[str, Any]]:
"""
Configure data sources for dashboard metrics
"""
return {
'vulnerability_scanner': {
'type': 'api',
'endpoint': 'http://vulnerability-scanner/api/metrics',
'interval': 300, # 5 minutes
'metrics': ['vulnerability_count', 'vulnerability_trends']
},
'siem_system': {
'type': 'api',
'endpoint': 'http://siem/api/events',
'interval': 60, # 1 minute
'metrics': ['security_events', 'incident_count']
},
'compliance_checker': {
'type': 'api',
'endpoint': 'http://compliance/api/status',
'interval': 1800, # 30 minutes
'metrics': ['compliance_score', 'policy_violations']
},
'network_monitor': {
'type': 'api',
'endpoint': 'http://network-monitor/api/traffic',
'interval': 30, # 30 seconds
'metrics': ['suspicious_traffic', 'blocked_connections']
}
}
async def collect_from_source(self, source_name: str, source_config: Dict[str, Any]):
"""
Collect metrics from a specific data source
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(source_config['endpoint']) as response:
if response.status == 200:
data = await response.json()
# Process the data and create metrics
for metric_name in source_config['metrics']:
if metric_name in data:
metric = DashboardMetric(
name=metric_name,
value=data[metric_name],
timestamp=datetime.utcnow(),
source=source_name,
tags={'source': source_name}
)
self.metrics.append(metric)
# Update dashboard data
self.dashboard_data[metric_name] = {
'value': metric.value,
'timestamp': metric.timestamp.isoformat(),
'source': metric.source
}
except Exception as e:
print(f"Error collecting from {source_name}: {str(e)}")
async def collect_all_data(self):
"""
Collect data from all configured sources
"""
tasks = []
for source_name, source_config in self.data_sources.items():
task = asyncio.create_task(self.collect_from_source(source_name, source_config))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
def start_collection(self):
"""
Start continuous metric collection
"""
if self.is_collecting:
return
self.is_collecting = True
self.collection_thread = threading.Thread(target=self._collection_loop)
self.collection_thread.daemon = True
self.collection_thread.start()
def _collection_loop(self):
"""
Main collection loop
"""
while self.is_collecting:
try:
# Run collection asynchronously
asyncio.run(self.collect_all_data())
# Sleep for the minimum collection interval
time.sleep(30) # Collect every 30 seconds
except Exception as e:
print(f"Error in collection loop: {str(e)}")
time.sleep(60) # Wait longer on error
def stop_collection(self):
"""
Stop metric collection
"""
self.is_collecting = False
if self.collection_thread:
self.collection_thread.join()
def get_current_dashboard_data(self) -> Dict[str, Any]:
"""
Get current dashboard data
"""
return {
'timestamp': datetime.utcnow().isoformat(),
'metrics': self.dashboard_data,
'metric_history': self.get_recent_metrics(100), # Last 100 metrics
'alerts': self.get_active_alerts(),
'summary': self.get_dashboard_summary()
}
def get_recent_metrics(self, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get recent metrics for trend analysis
"""
recent_metrics = sorted(self.metrics, key=lambda m: m.timestamp, reverse=True)[:limit]
return [
{
'name': m.name,
'value': m.value,
'timestamp': m.timestamp.isoformat(),
'source': m.source,
'tags': m.tags
}
for m in recent_metrics
]
def get_active_alerts(self) -> List[Dict[str, Any]]:
"""
Get active security alerts
"""
alerts = []
# Check for high vulnerability counts
vuln_count = self.dashboard_data.get('vulnerability_count', {}).get('value', 0)
if vuln_count > 50: # threshold
alerts.append({
'id': 'high_vulnerability_count',
'severity': 'high',
'message': f'High number of vulnerabilities detected: {vuln_count}',
'timestamp': datetime.utcnow().isoformat()
})
# Check for security incidents
incident_count = self.dashboard_data.get('incident_count', {}).get('value', 0)
if incident_count > 5: # threshold
alerts.append({
'id': 'high_incident_count',
'severity': 'medium',
'message': f'High number of security incidents: {incident_count}',
'timestamp': datetime.utcnow().isoformat()
})
# Check for compliance violations
violations = self.dashboard_data.get('policy_violations', {}).get('value', 0)
if violations > 10: # threshold
alerts.append({
'id': 'high_policy_violations',
'severity': 'medium',
'message': f'High number of policy violations: {violations}',
'timestamp': datetime.utcnow().isoformat()
})
return alerts
def get_dashboard_summary(self) -> Dict[str, Any]:
"""
Get dashboard summary
"""
return {
'total_vulnerabilities': self.dashboard_data.get('vulnerability_count', {}).get('value', 0),
'security_incidents': self.dashboard_data.get('incident_count', {}).get('value', 0),
'compliance_score': self.dashboard_data.get('compliance_score', {}).get('value', 100),
'active_alerts': len(self.get_active_alerts()),
'last_update': datetime.utcnow().isoformat()
}
# Example dashboard API
from flask import Flask, jsonify
app = Flask(__name__)
dashboard_collector = SecurityDashboardCollector()
@app.route('/api/dashboard/realtime')
def get_realtime_dashboard():
"""
API endpoint for real-time dashboard data
"""
dashboard_data = dashboard_collector.get_current_dashboard_data()
return jsonify(dashboard_data)
@app.route('/api/dashboard/summary')
def get_dashboard_summary():
"""
API endpoint for dashboard summary
"""
summary = dashboard_collector.get_dashboard_summary()
return jsonify(summary)
@app.route('/api/dashboard/alerts')
def get_dashboard_alerts():
"""
API endpoint for active alerts
"""
alerts = dashboard_collector.get_active_alerts()
return jsonify(alerts)
@app.route('/api/dashboard/trends')
def get_dashboard_trends():
"""
API endpoint for metric trends
"""
trends = dashboard_collector.get_recent_metrics(50)
return jsonify(trends)
# Example usage
def run_dashboard_example():
"""
Example of running the dashboard collector
"""
# Start the collector
dashboard_collector.start_collection()
try:
# Let it run for a while to collect data
time.sleep(10)
# Get current dashboard data
dashboard_data = dashboard_collector.get_current_dashboard_data()
print(f"Dashboard Summary:")
print(f" Total Vulnerabilities: {dashboard_data['summary']['total_vulnerabilities']}")
print(f" Security Incidents: {dashboard_data['summary']['security_incidents']}")
print(f" Compliance Score: {dashboard_data['summary']['compliance_score']}")
print(f" Active Alerts: {dashboard_data['summary']['active_alerts']}")
print(f"\nActive Alerts:")
for alert in dashboard_data['alerts']:
print(f" - {alert['severity'].upper()}: {alert['message']}")
finally:
# Stop the collector
dashboard_collector.stop_collection()
# Run example (commented out to prevent actual API startup)
# run_dashboard_example()Dashboard Visualization
Dashboard Frontend Integration
<!-- Example: Security dashboard HTML template -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>DevSecOps Security Dashboard</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.dashboard-container {
max-width: 1200px;
margin: 0 auto;
}
.dashboard-header {
background-color: #2c3e50;
color: white;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
margin-bottom: 20px;
}
.metric-card {
background-color: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.metric-value {
font-size: 2em;
font-weight: bold;
color: #3498db;
}
.metric-title {
font-size: 1em;
color: #7f8c8d;
margin-bottom: 10px;
}
.chart-container {
background-color: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
.alerts-section {
background-color: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.alert-item {
padding: 10px;
margin: 5px 0;
border-radius: 4px;
border-left: 4px solid #e74c3c;
}
.alert-high { border-left-color: #e74c3c; }
.alert-medium { border-left-color: #f39c12; }
.alert-low { border-left-color: #3498db; }
</style>
</head>
<body>
<div class="dashboard-container">
<div class="dashboard-header">
<h1>DevSecOps Security Dashboard</h1>
<p>Real-time security metrics and monitoring</p>
</div>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-title">Total Vulnerabilities</div>
<div class="metric-value" id="total-vulnerabilities">0</div>
<div class="metric-trend">↓ 12% from last week</div>
</div>
<div class="metric-card">
<div class="metric-title">Security Incidents</div>
<div class="metric-value" id="security-incidents">0</div>
<div class="metric-trend">↑ 5% from last week</div>
</div>
<div class="metric-card">
<div class="metric-title">Compliance Score</div>
<div class="metric-value" id="compliance-score">0%</div>
<div class="metric-trend">→ Stable</div>
</div>
<div class="metric-card">
<div class="metric-title">Active Alerts</div>
<div class="metric-value" id="active-alerts">0</div>
<div class="metric-trend">↓ 20% from last week</div>
</div>
</div>
<div class="chart-container">
<h3>Vulnerability Trends (Last 30 Days)</h3>
<div id="vulnerability-trend-chart" style="height: 400px;"></div>
</div>
<div class="chart-container">
<h3>Security Incidents by Type</h3>
<div id="incident-type-chart" style="height: 400px;"></div>
</div>
<div class="alerts-section">
<h3>Active Security Alerts</h3>
<div id="alerts-container">
<p>No active alerts</p>
</div>
</div>
</div>
<script>
// Function to fetch dashboard data
async function fetchDashboardData() {
try {
const response = await fetch('/api/dashboard/realtime');
const data = await response.json();
updateDashboard(data);
} catch (error) {
console.error('Error fetching dashboard data:', error);
}
}
// Function to update dashboard with new data
function updateDashboard(data) {
// Update metric cards
document.getElementById('total-vulnerabilities').textContent =
data.summary.total_vulnerabilities;
document.getElementById('security-incidents').textContent =
data.summary.security_incidents;
document.getElementById('compliance-score').textContent =
data.summary.compliance_score + '%';
document.getElementById('active-alerts').textContent =
data.summary.active_alerts;
// Update charts
updateVulnerabilityTrendChart(data.metric_history);
updateIncidentTypeChart(data.metrics);
updateAlerts(data.alerts);
}
// Function to update vulnerability trend chart
function updateVulnerabilityTrendChart(metricHistory) {
const vulnerabilityData = metricHistory
.filter(m => m.name === 'vulnerability_count')
.slice(-30); // Last 30 data points
if (vulnerabilityData.length === 0) return;
const xValues = vulnerabilityData.map(m => new Date(m.timestamp).toLocaleDateString());
const yValues = vulnerabilityData.map(m => m.value);
const trace = {
x: xValues,
y: yValues,
type: 'scatter',
mode: 'lines+markers',
name: 'Vulnerabilities'
};
const layout = {
title: 'Vulnerability Trends',
xaxis: { title: 'Date' },
yaxis: { title: 'Number of Vulnerabilities' }
};
Plotly.newPlot('vulnerability-trend-chart', [trace], layout);
}
// Function to update incident type chart
function updateIncidentTypeChart(metrics) {
// Mock data for incident types
const incidentTypes = {
'Malware': 15,
'Unauthorized Access': 8,
'DDoS': 3,
'Phishing': 12,
'Data Breach': 2
};
const trace = {
x: Object.keys(incidentTypes),
y: Object.values(incidentTypes),
type: 'bar',
name: 'Incident Types'
};
const layout = {
title: 'Security Incidents by Type',
xaxis: { title: 'Incident Type' },
yaxis: { title: 'Number of Incidents' }
};
Plotly.newPlot('incident-type-chart', [trace], layout);
}
// Function to update alerts
function updateAlerts(alerts) {
const alertsContainer = document.getElementById('alerts-container');
if (alerts.length === 0) {
alertsContainer.innerHTML = '<p>No active alerts</p>';
return;
}
alertsContainer.innerHTML = alerts.map(alert => `
<div class="alert-item alert-${alert.severity}">
<strong>${alert.severity.toUpperCase()}:</strong> ${alert.message}
<br><small>${new Date(alert.timestamp).toLocaleString()}</small>
</div>
`).join('');
}
// Initial data fetch
fetchDashboardData();
// Update every 30 seconds
setInterval(fetchDashboardData, 30000);
</script>
</body>
</html>Security Reporting
Automated Security Reports
Report Generation Framework
# Example: Security report generation framework
import jinja2
from datetime import datetime, timedelta
from typing import Dict, List, Any
import json
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from email.mime.base import MimeBase
from email import encoders
import pdfkit
import os
class SecurityReportGenerator:
"""
Generate automated security reports
"""
def __init__(self):
self.templates = self.load_report_templates()
self.report_config = self.load_report_config()
def load_report_templates(self) -> Dict[str, str]:
"""
Load report templates
"""
templates = {
'executive_summary': """
# Security Executive Summary - {{ date }}
## Key Metrics
- **Total Vulnerabilities**: {{ metrics.total_vulnerabilities }}
- **Security Incidents**: {{ metrics.security_incidents }}
- **Compliance Score**: {{ metrics.compliance_score }}%
- **Mean Time to Remediate**: {{ metrics.mttr }} hours
## Risk Assessment
- **Current Risk Level**: {{ risk_level }}
- **Trend**: {{ trend_analysis }}
## Key Achievements
{% for achievement in achievements %}
- {{ achievement }}
{% endfor %}
## Areas for Improvement
{% for improvement in improvements %}
- {{ improvement }}
{% endfor %}
## Recommendations
{% for recommendation in recommendations %}
1. {{ recommendation }}
{% endfor %}
""",
'technical_report': """
# Technical Security Report - {{ date }}
## Vulnerability Analysis
### Critical Vulnerabilities ({{ vulnerability_summary.critical }})
{% for vuln in critical_vulnerabilities %}
- {{ vuln.name }}: {{ vuln.description }}
{% endfor %}
### High Vulnerabilities ({{ vulnerability_summary.high }})
{% for vuln in high_vulnerabilities %}
- {{ vuln.name }}: {{ vuln.description }}
{% endfor %}
## Incident Analysis
{% for incident in incidents %}
### Incident: {{ incident.title }}
- **Severity**: {{ incident.severity }}
- **Status**: {{ incident.status }}
- **Detected**: {{ incident.detected_at }}
- **Resolved**: {{ incident.resolved_at if incident.resolved_at else 'Not resolved' }}
- **Description**: {{ incident.description }}
{% endfor %}
## Compliance Status
- **Overall Compliance**: {{ compliance_score }}%
- **Framework**: {{ compliance_framework }}
- **Last Audit**: {{ last_audit_date }}
## Security Controls Status
{% for control, status in security_controls.items() %}
- **{{ control }}**: {{ status }}
{% endfor %}
""",
'compliance_report': """
# Compliance Report - {{ date }}
## Compliance Framework: {{ framework }}
### Controls Assessment
{% for control in controls %}
#### {{ control.name }}
- **Status**: {{ control.status }}
- **Evidence**: {{ control.evidence }}
- **Finding**: {{ control.finding if control.finding else 'None' }}
{% endfor %}
### Gap Analysis
{% for gap in gaps %}
- {{ gap.description }} ({{ gap.severity }})
{% endfor %}
### Recommendations
{% for rec in recommendations %}
- {{ rec }}
{% endfor %}
### Next Audit
- **Scheduled**: {{ next_audit_date }}
- **Focus Areas**: {{ focus_areas }}
"""
}
return templates
def load_report_config(self) -> Dict[str, Any]:
"""
Load report configuration
"""
return {
'report_formats': ['pdf', 'html', 'json'],
'distribution_lists': {
'executive': ['[email protected]', '[email protected]'],
'technical': ['[email protected]', '[email protected]'],
'compliance': ['[email protected]', '[email protected]']
},
'scheduling': {
'daily': ['security_summary'],
'weekly': ['executive_summary', 'technical_report'],
'monthly': ['compliance_report', 'trend_analysis']
}
}
def generate_executive_summary(self, data: Dict[str, Any]) -> str:
"""
Generate executive summary report
"""
template = jinja2.Template(self.templates['executive_summary'])
report_data = {
'date': datetime.utcnow().strftime('%Y-%m-%d'),
'metrics': {
'total_vulnerabilities': data.get('total_vulnerabilities', 0),
'security_incidents': data.get('security_incidents', 0),
'compliance_score': data.get('compliance_score', 0),
'mttr': data.get('mean_time_to_remediate', 0)
},
'risk_level': self.calculate_risk_level(data),
'trend_analysis': self.analyze_trends(data),
'achievements': data.get('achievements', []),
'improvements': data.get('improvements', []),
'recommendations': data.get('recommendations', [])
}
return template.render(**report_data)
def generate_technical_report(self, data: Dict[str, Any]) -> str:
"""
Generate technical security report
"""
template = jinja2.Template(self.templates['technical_report'])
report_data = {
'date': datetime.utcnow().strftime('%Y-%m-%d'),
'vulnerability_summary': data.get('vulnerability_summary', {}),
'critical_vulnerabilities': data.get('critical_vulnerabilities', []),
'high_vulnerabilities': data.get('high_vulnerabilities', []),
'incidents': data.get('incidents', []),
'compliance_score': data.get('compliance_score', 0),
'compliance_framework': data.get('compliance_framework', 'N/A'),
'last_audit_date': data.get('last_audit_date', 'N/A'),
'security_controls': data.get('security_controls', {})
}
return template.render(**report_data)
def generate_compliance_report(self, data: Dict[str, Any]) -> str:
"""
Generate compliance report
"""
template = jinja2.Template(self.templates['compliance_report'])
report_data = {
'date': datetime.utcnow().strftime('%Y-%m-%d'),
'framework': data.get('framework', 'N/A'),
'controls': data.get('controls', []),
'gaps': data.get('gaps', []),
'recommendations': data.get('recommendations', []),
'next_audit_date': data.get('next_audit_date', 'N/A'),
'focus_areas': data.get('focus_areas', [])
}
return template.render(**report_data)
def calculate_risk_level(self, data: Dict[str, Any]) -> str:
"""
Calculate overall risk level
"""
vuln_score = data.get('total_vulnerabilities', 0) * 0.3
incident_score = data.get('security_incidents', 0) * 0.4
compliance_score = (100 - data.get('compliance_score', 100)) * 0.3
total_score = vuln_score + incident_score + compliance_score
if total_score >= 80:
return 'Critical'
elif total_score >= 60:
return 'High'
elif total_score >= 40:
return 'Medium'
else:
return 'Low'
def analyze_trends(self, data: Dict[str, Any]) -> str:
"""
Analyze security trends
"""
# This would typically compare to previous periods
# For this example, we'll provide a mock analysis
return "Security posture is improving with reduced vulnerability count"
def generate_pdf_report(self, html_content: str, filename: str):
"""
Generate PDF report from HTML content
"""
options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
'no-outline': None
}
pdfkit.from_string(html_content, filename, options=options)
def send_report_email(self, subject: str, body: str, recipients: List[str], attachments: List[str] = None):
"""
Send report via email
"""
msg = MimeMultipart()
msg['Subject'] = subject
msg['From'] = os.getenv('SMTP_FROM', '[email protected]')
msg['To'] = ', '.join(recipients)
msg.attach(MimeText(body, 'html'))
# Add attachments if provided
if attachments:
for file_path in attachments:
with open(file_path, 'rb') as attachment:
part = MimeBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {os.path.basename(file_path)}'
)
msg.attach(part)
# Send email
smtp_server = os.getenv('SMTP_SERVER', 'localhost')
smtp_port = int(os.getenv('SMTP_PORT', 587))
smtp_user = os.getenv('SMTP_USER')
smtp_password = os.getenv('SMTP_PASSWORD')
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_password)
server.send_message(msg)
def generate_and_distribute_report(self, report_type: str, data: Dict[str, Any], recipients: List[str] = None):
"""
Generate and distribute security report
"""
# Generate report content based on type
if report_type == 'executive_summary':
content = self.generate_executive_summary(data)
elif report_type == 'technical_report':
content = self.generate_technical_report(data)
elif report_type == 'compliance_report':
content = self.generate_compliance_report(data)
else:
raise ValueError(f"Unknown report type: {report_type}")
# Generate PDF
filename = f"security_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.pdf"
self.generate_pdf_report(content, filename)
# Send email if recipients provided
if recipients:
self.send_report_email(
subject=f"Security Report - {datetime.utcnow().strftime('%Y-%m-%d')}",
body=content,
recipients=recipients,
attachments=[filename]
)
return filename
# Example usage
def generate_security_reports():
"""
Example of generating security reports
"""
generator = SecurityReportGenerator()
# Sample data
report_data = {
'total_vulnerabilities': 45,
'security_incidents': 3,
'compliance_score': 92.5,
'mean_time_to_remediate': 18.5,
'vulnerability_summary': {
'critical': 2,
'high': 8,
'medium': 20,
'low': 15
},
'critical_vulnerabilities': [
{'name': 'CVE-2023-1234', 'description': 'SQL Injection in user authentication'},
{'name': 'CVE-2023-5678', 'description': 'Authentication bypass vulnerability'}
],
'high_vulnerabilities': [
{'name': 'CVE-2023-9012', 'description': 'Cross-site scripting vulnerability'}
],
'incidents': [
{
'title': 'Suspicious login attempt',
'severity': 'high',
'status': 'resolved',
'detected_at': '2023-12-01T10:30:00Z',
'resolved_at': '2023-12-01T12:15:00Z',
'description': 'Multiple failed login attempts from unusual location'
}
],
'achievements': [
'Reduced critical vulnerabilities by 25%',
'Improved mean time to detect from 4 hours to 1 hour'
],
'improvements': [
'Vulnerability remediation process needs acceleration',
'Security awareness training completion rate at 78% (target 90%)'
],
'recommendations': [
'Implement automated security testing in CI/CD pipeline',
'Conduct quarterly penetration testing',
'Enhance security monitoring capabilities'
]
}
# Generate executive summary
exec_report = generator.generate_executive_summary(report_data)
print("Executive Summary Generated")
# Generate technical report
tech_report = generator.generate_technical_report(report_data)
print("Technical Report Generated")
# Generate and send report
filename = generator.generate_and_distribute_report(
report_type='executive_summary',
data=report_data,
recipients=['[email protected]']
)
print(f"Report generated and sent: {filename}")
return exec_report, tech_report
# Run example
# exec_report, tech_report = generate_security_reports()Conclusion
DevSecOps security metrics and reporting provide the visibility and accountability necessary to maintain and improve security posture in continuous delivery environments. Effective security metrics should be:
- Actionable: Metrics should drive specific actions and improvements
- Measurable: Metrics should be quantifiable and trackable over time
- Relevant: Metrics should align with business and security objectives
- Timely: Metrics should be collected and reported in real-time or near real-time
The key to successful security metrics and reporting is establishing a comprehensive framework that covers all aspects of the security program while providing clear, actionable insights to stakeholders at all levels of the organization.
In the next article, we'll explore DevSecOps security culture and training, examining how to build and maintain a strong security culture within development teams.