CloudTadaInsights

DevSecOps Security Metrics and Reporting

DevSecOps Security Metrics and Reporting

Overview

DevSecOps security metrics and reporting provide the visibility and accountability necessary to maintain and improve security posture in continuous delivery environments. This article explores how to establish meaningful security metrics, create effective reporting mechanisms, and build dashboards that enable data-driven security decisions.

Security Metrics Framework

Defining Security Metrics

Key Security Indicators

Security metrics in DevSecOps environments must balance security effectiveness with development velocity:

PYTHON
# Example: Security metrics framework
import time
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import statistics
from dataclasses import dataclass
from enum import Enum

class SecurityMetricType(Enum):
    VULNERABILITY = "vulnerability"
    INCIDENT = "incident"
    COMPLIANCE = "compliance"
    PROCESS = "process"
    PERFORMANCE = "performance"

@dataclass
class SecurityMetric:
    name: str
    value: float
    unit: str
    timestamp: datetime
    metric_type: SecurityMetricType
    tags: Dict[str, str] = None
    description: str = ""

class SecurityMetricsCollector:
    """
    Collect and aggregate security metrics
    """
    
    def __init__(self):
        self.metrics = []
        self.metrics_store = {}
        self.alert_thresholds = self.define_alert_thresholds()
        self.kpi_definitions = self.define_kpi_definitions()
    
    def define_alert_thresholds(self) -> Dict[str, Dict[str, float]]:
        """
        Define alert thresholds for security metrics
        """
        return {
            'vulnerability_count': {
                'critical': 0,
                'high': 5,
                'medium': 20,
                'low': 50
            },
            'mean_time_to_remediate': {
                'critical': 24,  # hours
                'high': 72,      # hours
                'medium': 168    # hours (1 week)
            },
            'security_incidents': {
                'daily_threshold': 5,
                'weekly_threshold': 20
            },
            'compliance_score': {
                'minimum_acceptable': 95.0  # percentage
            }
        }
    
    def define_kpi_definitions(self) -> Dict[str, Dict[str, Any]]:
        """
        Define Key Performance Indicators for security
        """
        return {
            'security_velocity': {
                'name': 'Security Velocity',
                'description': 'Rate of security improvements vs new vulnerabilities',
                'formula': '(vulnerabilities_fixed / vulnerabilities_found) * 100',
                'target': 100.0,
                'unit': 'percentage'
            },
            'mean_time_to_detect': {
                'name': 'Mean Time to Detect (MTTD)',
                'description': 'Average time to detect security incidents',
                'formula': 'average(time_from_occurrence_to_detection)',
                'target': 1.0,  # hours
                'unit': 'hours'
            },
            'mean_time_to_remediate': {
                'name': 'Mean Time to Remediate (MTTR)',
                'description': 'Average time to fix security vulnerabilities',
                'formula': 'average(time_from_detection_to_remediation)',
                'target': 24.0,  # hours for critical
                'unit': 'hours'
            },
            'security_coverage': {
                'name': 'Security Test Coverage',
                'description': 'Percentage of code covered by security tests',
                'formula': '(security_tested_lines / total_code_lines) * 100',
                'target': 80.0,
                'unit': 'percentage'
            },
            'compliance_score': {
                'name': 'Compliance Score',
                'description': 'Overall compliance with security standards',
                'formula': '(compliant_items / total_items) * 100',
                'target': 95.0,
                'unit': 'percentage'
            },
            'security_awareness': {
                'name': 'Security Awareness',
                'description': 'Team security training completion rate',
                'formula': '(trained_team_members / total_team_members) * 100',
                'target': 100.0,
                'unit': 'percentage'
            }
        }
    
    def collect_vulnerability_metrics(self, scan_results: Dict[str, Any]) -> List[SecurityMetric]:
        """
        Collect vulnerability-related metrics
        """
        metrics = []
        
        # Count vulnerabilities by severity
        vuln_counts = scan_results.get('vulnerability_summary', {})
        
        for severity, count in vuln_counts.items():
            metrics.append(SecurityMetric(
                name=f'vulnerability_count_{severity}',
                value=count,
                unit='count',
                timestamp=datetime.utcnow(),
                metric_type=SecurityMetricType.VULNERABILITY,
                tags={'severity': severity},
                description=f'Number of {severity} severity vulnerabilities found'
            ))
        
        # Calculate vulnerability density
        total_vulns = sum(vuln_counts.values())
        code_lines = scan_results.get('code_lines_scanned', 100000)  # Default to 100k lines
        vuln_density = (total_vulns / code_lines) * 1000  # per 1000 lines of code
        
        metrics.append(SecurityMetric(
            name='vulnerability_density',
            value=vuln_density,
            unit='vulnerabilities_per_kloc',
            timestamp=datetime.utcnow(),
            metric_type=SecurityMetricType.VULNERABILITY,
            description='Vulnerabilities per 1000 lines of code'
        ))
        
        return metrics
    
    def collect_incident_metrics(self, incident_data: Dict[str, Any]) -> List[SecurityMetric]:
        """
        Collect incident-related metrics
        """
        metrics = []
        
        # Count incidents by severity
        severity_counts = {}
        for incident in incident_data.get('incidents', []):
            severity = incident.get('severity', 'unknown')
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
        
        for severity, count in severity_counts.items():
            metrics.append(SecurityMetric(
                name=f'incidents_count_{severity}',
                value=count,
                unit='count',
                timestamp=datetime.utcnow(),
                metric_type=SecurityMetricType.INCIDENT,
                tags={'severity': severity},
                description=f'Number of {severity} severity incidents'
            ))
        
        # Calculate mean time to detect and remediate
        if incident_data.get('incidents'):
            detection_times = []
            remediation_times = []
            
            for incident in incident_data['incidents']:
                detected_at = datetime.fromisoformat(incident['detected_at'])
                occurred_at = datetime.fromisoformat(incident['occurred_at'])
                resolved_at = datetime.fromisoformat(incident['resolved_at']) if incident.get('resolved_at') else datetime.utcnow()
                
                detection_time = (detected_at - occurred_at).total_seconds() / 3600  # hours
                remediation_time = (resolved_at - detected_at).total_seconds() / 3600  # hours
                
                detection_times.append(detection_time)
                remediation_times.append(remediation_time)
            
            if detection_times:
                avg_detection_time = statistics.mean(detection_times)
                metrics.append(SecurityMetric(
                    name='mean_time_to_detect',
                    value=avg_detection_time,
                    unit='hours',
                    timestamp=datetime.utcnow(),
                    metric_type=SecurityMetricType.PERFORMANCE,
                    description='Average time to detect security incidents'
                ))
            
            if remediation_times:
                avg_remediation_time = statistics.mean(remediation_times)
                metrics.append(SecurityMetric(
                    name='mean_time_to_remediate',
                    value=avg_remediation_time,
                    unit='hours',
                    timestamp=datetime.utcnow(),
                    metric_type=SecurityMetricType.PERFORMANCE,
                    description='Average time to remediate security incidents'
                ))
        
        return metrics
    
    def collect_compliance_metrics(self, compliance_data: Dict[str, Any]) -> List[SecurityMetric]:
        """
        Collect compliance-related metrics
        """
        metrics = []
        
        # Calculate compliance score
        compliant_items = compliance_data.get('compliant_items', 0)
        total_items = compliance_data.get('total_items', 1)
        
        compliance_score = (compliant_items / total_items) * 100 if total_items > 0 else 0
        
        metrics.append(SecurityMetric(
            name='compliance_score',
            value=compliance_score,
            unit='percentage',
            timestamp=datetime.utcnow(),
            metric_type=SecurityMetricType.COMPLIANCE,
            description='Overall compliance with security standards'
        ))
        
        # Count compliance violations by type
        violations_by_type = compliance_data.get('violations_by_type', {})
        for violation_type, count in violations_by_type.items():
            metrics.append(SecurityMetric(
                name=f'compliance_violations_{violation_type}',
                value=count,
                unit='count',
                timestamp=datetime.utcnow(),
                metric_type=SecurityMetricType.COMPLIANCE,
                tags={'violation_type': violation_type},
                description=f'Number of {violation_type} compliance violations'
            ))
        
        return metrics
    
    def collect_process_metrics(self, process_data: Dict[str, Any]) -> List[SecurityMetric]:
        """
        Collect security process metrics
        """
        metrics = []
        
        # Security test coverage
        security_tested_lines = process_data.get('security_tested_lines', 0)
        total_lines = process_data.get('total_lines', 1)
        
        coverage_percentage = (security_tested_lines / total_lines) * 100 if total_lines > 0 else 0
        
        metrics.append(SecurityMetric(
            name='security_test_coverage',
            value=coverage_percentage,
            unit='percentage',
            timestamp=datetime.utcnow(),
            metric_type=SecurityMetricType.PROCESS,
            description='Percentage of code covered by security tests'
        ))
        
        # Security training completion
        trained_members = process_data.get('trained_team_members', 0)
        total_members = process_data.get('total_team_members', 1)
        
        training_completion = (trained_members / total_members) * 100 if total_members > 0 else 0
        
        metrics.append(SecurityMetric(
            name='security_training_completion',
            value=training_completion,
            unit='percentage',
            timestamp=datetime.utcnow(),
            metric_type=SecurityMetricType.PROCESS,
            description='Team security training completion rate'
        ))
        
        # Mean time to security review
        review_times = process_data.get('security_review_times', [])
        if review_times:
            avg_review_time = statistics.mean(review_times)
            metrics.append(SecurityMetric(
                name='mean_time_to_security_review',
                value=avg_review_time,
                unit='hours',
                timestamp=datetime.utcnow(),
                metric_type=SecurityMetricType.PROCESS,
                description='Average time to complete security reviews'
            ))
        
        return metrics
    
    def calculate_kpi(self, kpi_name: str, data: Dict[str, Any]) -> float:
        """
        Calculate a specific KPI value
        """
        kpi_def = self.kpi_definitions.get(kpi_name)
        if not kpi_def:
            raise ValueError(f"KPI {kpi_name} not defined")
        
        if kpi_name == 'security_velocity':
            vulnerabilities_found = data.get('vulnerabilities_found', 1)
            vulnerabilities_fixed = data.get('vulnerabilities_fixed', 0)
            return (vulnerabilities_fixed / vulnerabilities_found) * 100
        
        elif kpi_name == 'mean_time_to_detect':
            detection_times = data.get('detection_times', [])
            return statistics.mean(detection_times) if detection_times else 0
        
        elif kpi_name == 'mean_time_to_remediate':
            remediation_times = data.get('remediation_times', [])
            return statistics.mean(remediation_times) if remediation_times else 0
        
        elif kpi_name == 'security_coverage':
            security_tested_lines = data.get('security_tested_lines', 0)
            total_lines = data.get('total_lines', 1)
            return (security_tested_lines / total_lines) * 100 if total_lines > 0 else 0
        
        elif kpi_name == 'compliance_score':
            compliant_items = data.get('compliant_items', 0)
            total_items = data.get('total_items', 1)
            return (compliant_items / total_items) * 100 if total_items > 0 else 0
        
        elif kpi_name == 'security_awareness':
            trained_members = data.get('trained_team_members', 0)
            total_members = data.get('total_team_members', 1)
            return (trained_members / total_members) * 100 if total_members > 0 else 0
        
        return 0.0
    
    def generate_security_report(self, time_range: str = '24h') -> Dict[str, Any]:
        """
        Generate comprehensive security report
        """
        # Calculate time range
        if time_range == '24h':
            start_time = datetime.utcnow() - timedelta(hours=24)
        elif time_range == '7d':
            start_time = datetime.utcnow() - timedelta(days=7)
        elif time_range == '30d':
            start_time = datetime.utcnow() - timedelta(days=30)
        else:
            start_time = datetime.utcnow() - timedelta(days=1)
        
        # Aggregate metrics within time range
        relevant_metrics = [m for m in self.metrics if m.timestamp >= start_time]
        
        # Group metrics by type
        metrics_by_type = {}
        for metric in relevant_metrics:
            metric_type = metric.metric_type.value
            if metric_type not in metrics_by_type:
                metrics_by_type[metric_type] = []
            metrics_by_type[metric_type].append(metric)
        
        # Calculate KPIs
        kpi_data = {}
        for kpi_name in self.kpi_definitions.keys():
            try:
                kpi_value = self.calculate_kpi(kpi_name, {})
                kpi_data[kpi_name] = {
                    'value': kpi_value,
                    'target': self.kpi_definitions[kpi_name]['target'],
                    'status': 'met' if kpi_value >= self.kpi_definitions[kpi_name]['target'] else 'not_met'
                }
            except Exception as e:
                kpi_data[kpi_name] = {
                    'value': 0,
                    'target': self.kpi_definitions[kpi_name]['target'],
                    'status': 'error',
                    'error': str(e)
                }
        
        report = {
            'report_timestamp': datetime.utcnow().isoformat(),
            'time_range': time_range,
            'start_time': start_time.isoformat(),
            'end_time': datetime.utcnow().isoformat(),
            'metrics_by_type': {
                metric_type: [
                    {
                        'name': m.name,
                        'value': m.value,
                        'unit': m.unit,
                        'timestamp': m.timestamp.isoformat(),
                        'tags': m.tags
                    }
                    for m in metrics
                ]
                for metric_type, metrics in metrics_by_type.items()
            },
            'kpis': kpi_data,
            'trending': self.calculate_trends(relevant_metrics),
            'alerts': self.check_alerts(relevant_metrics),
            'summary': self.generate_summary(metrics_by_type, kpi_data)
        }
        
        return report
    
    def calculate_trends(self, metrics: List[SecurityMetric]) -> Dict[str, Any]:
        """
        Calculate trends for security metrics
        """
        trends = {}
        
        # Group metrics by name
        metrics_by_name = {}
        for metric in metrics:
            if metric.name not in metrics_by_name:
                metrics_by_name[metric.name] = []
            metrics_by_name[metric.name].append(metric)
        
        # Calculate trends for each metric
        for metric_name, metric_list in metrics_by_name.items():
            if len(metric_list) < 2:
                continue
            
            # Sort by timestamp
            sorted_metrics = sorted(metric_list, key=lambda m: m.timestamp)
            values = [m.value for m in sorted_metrics]
            
            # Calculate trend (simple linear regression slope approximation)
            if len(values) >= 2:
                first_value = values[0]
                last_value = values[-1]
                trend_direction = 'increasing' if last_value > first_value else 'decreasing' if last_value < first_value else 'stable'
                
                trends[metric_name] = {
                    'direction': trend_direction,
                    'first_value': first_value,
                    'last_value': last_value,
                    'change': last_value - first_value,
                    'change_percentage': ((last_value - first_value) / first_value * 100) if first_value != 0 else 0
                }
        
        return trends
    
    def check_alerts(self, metrics: List[SecurityMetric]) -> List[Dict[str, Any]]:
        """
        Check for metrics that exceed alert thresholds
        """
        alerts = []
        
        for metric in metrics:
            threshold_key = metric.name.replace('_count', '').replace('_score', '').replace('_time', '')
            
            if threshold_key in self.alert_thresholds:
                thresholds = self.alert_thresholds[threshold_key]
                
                # Check if metric value exceeds thresholds
                for level, threshold_value in thresholds.items():
                    if metric.value > threshold_value:
                        alerts.append({
                            'metric_name': metric.name,
                            'current_value': metric.value,
                            'threshold_level': level,
                            'threshold_value': threshold_value,
                            'timestamp': metric.timestamp.isoformat(),
                            'severity': 'high' if level in ['critical', 'daily_threshold'] else 'medium'
                        })
        
        return alerts
    
    def generate_summary(self, metrics_by_type: Dict[str, List[Dict[str, Any]]], kpis: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
        """
        Generate summary of security metrics
        """
        summary = {
            'total_metrics': sum(len(metrics) for metrics in metrics_by_type.values()),
            'kpis_met': len([k for k, v in kpis.items() if v.get('status') == 'met']),
            'kpis_total': len(kpis),
            'compliance_score': kpis.get('compliance_score', {}).get('value', 0),
            'vulnerability_status': self.calculate_vulnerability_summary(metrics_by_type.get('vulnerability', [])),
            'incident_status': self.calculate_incident_summary(metrics_by_type.get('incident', [])),
            'trend_summary': self.calculate_trend_summary(metrics_by_type)
        }
        
        return summary
    
    def calculate_vulnerability_summary(self, vulnerability_metrics: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Calculate vulnerability summary
        """
        critical = sum(m['value'] for m in vulnerability_metrics if 'critical' in m['name'])
        high = sum(m['value'] for m in vulnerability_metrics if 'high' in m['name'])
        medium = sum(m['value'] for m in vulnerability_metrics if 'medium' in m['name'])
        low = sum(m['value'] for m in vulnerability_metrics if 'low' in m['name'])
        
        return {
            'critical': critical,
            'high': high,
            'medium': medium,
            'low': low,
            'total': critical + high + medium + low,
            'status': 'high_risk' if critical > 0 or high > 10 else 'medium_risk' if high > 0 or medium > 20 else 'low_risk'
        }
    
    def calculate_incident_summary(self, incident_metrics: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Calculate incident summary
        """
        critical = sum(m['value'] for m in incident_metrics if 'critical' in m['name'])
        high = sum(m['value'] for m in incident_metrics if 'high' in m['name'])
        medium = sum(m['value'] for m in incident_metrics if 'medium' in m['name'])
        low = sum(m['value'] for m in incident_metrics if 'low' in m['name'])
        
        return {
            'critical': critical,
            'high': high,
            'medium': medium,
            'low': low,
            'total': critical + high + medium + low,
            'status': 'critical' if critical > 0 else 'high' if high > 0 else 'medium' if medium > 0 else 'low'
        }
    
    def calculate_trend_summary(self, metrics_by_type: Dict[str, List[Dict[str, Any]]]) -> Dict[str, str]:
        """
        Calculate overall trend summary
        """
        positive_trends = 0
        negative_trends = 0
        
        for metric_list in metrics_by_type.values():
            for metric in metric_list:
                # This would be calculated based on actual trend data
                # For now, we'll use a simple heuristic
                if 'density' in metric['name'] or 'count' in metric['name']:
                    # Lower is better for these metrics
                    if metric['value'] < 10:  # arbitrary threshold
                        positive_trends += 1
                    else:
                        negative_trends += 1
        
        return {
            'positive_trends': positive_trends,
            'negative_trends': negative_trends,
            'overall_trend': 'improving' if positive_trends > negative_trends else 'declining'
        }

# Example usage
def run_security_metrics_example():
    """
    Example of running security metrics collection
    """
    collector = SecurityMetricsCollector()
    
    # Simulate collecting different types of metrics
    # Vulnerability metrics
    vuln_scan_results = {
        'vulnerability_summary': {
            'critical': 2,
            'high': 8,
            'medium': 15,
            'low': 25
        },
        'code_lines_scanned': 50000
    }
    
    vuln_metrics = collector.collect_vulnerability_metrics(vuln_scan_results)
    for metric in vuln_metrics:
        collector.metrics.append(metric)
    
    # Incident metrics
    incident_data = {
        'incidents': [
            {
                'severity': 'high',
                'occurred_at': (datetime.utcnow() - timedelta(hours=2)).isoformat(),
                'detected_at': (datetime.utcnow() - timedelta(hours=1)).isoformat(),
                'resolved_at': datetime.utcnow().isoformat()
            },
            {
                'severity': 'medium',
                'occurred_at': (datetime.utcnow() - timedelta(hours=10)).isoformat(),
                'detected_at': (datetime.utcnow() - timedelta(hours=8)).isoformat(),
                'resolved_at': (datetime.utcnow() - timedelta(hours=5)).isoformat()
            }
        ]
    }
    
    incident_metrics = collector.collect_incident_metrics(incident_data)
    for metric in incident_metrics:
        collector.metrics.append(metric)
    
    # Generate report
    report = collector.generate_security_report('24h')
    
    print(f"Security Report Summary:")
    print(f"  Compliance Score: {report['summary']['compliance_score']:.2f}%")
    print(f"  Vulnerability Status: {report['summary']['vulnerability_status']['status']}")
    print(f"  Incident Status: {report['summary']['incident_status']['status']}")
    print(f"  KPIs Met: {report['summary']['kpis_met']}/{report['summary']['kpis_total']}")
    
    return report

# Run example
# report = run_security_metrics_example()

Security Scorecards

Creating comprehensive security scorecards:

PYTHON
# Example: Security scorecard system
from dataclasses import dataclass
from typing import Dict, List, Any
import json
from datetime import datetime

@dataclass
class SecurityScorecard:
    """
    Security scorecard for teams/projects
    """
    team_name: str
    project_name: str
    scorecard_date: datetime
    categories: Dict[str, Dict[str, Any]]
    overall_score: float
    risk_level: str
    recommendations: List[str]

class SecurityScorecardGenerator:
    """
    Generate security scorecards for teams and projects
    """
    
    def __init__(self):
        self.scorecard_templates = self.define_scorecard_templates()
        self.scoring_weights = self.define_scoring_weights()
    
    def define_scorecard_templates(self) -> Dict[str, Dict[str, Any]]:
        """
        Define security scorecard templates
        """
        return {
            'development_team': {
                'categories': [
                    'Secure Coding Practices',
                    'Security Testing',
                    'Vulnerability Management',
                    'Security Awareness',
                    'Compliance'
                ],
                'max_score': 100
            },
            'application': {
                'categories': [
                    'Code Security',
                    'Dependency Security',
                    'Runtime Security',
                    'Configuration Security',
                    'Monitoring & Logging'
                ],
                'max_score': 100
            },
            'infrastructure': {
                'categories': [
                    'Network Security',
                    'Identity & Access Management',
                    'Data Protection',
                    'Infrastructure Security',
                    'Compliance'
                ],
                'max_score': 100
            }
        }
    
    def define_scoring_weights(self) -> Dict[str, Dict[str, float]]:
        """
        Define scoring weights for different categories
        """
        return {
            'development_team': {
                'Secure Coding Practices': 0.25,
                'Security Testing': 0.25,
                'Vulnerability Management': 0.20,
                'Security Awareness': 0.20,
                'Compliance': 0.10
            },
            'application': {
                'Code Security': 0.30,
                'Dependency Security': 0.25,
                'Runtime Security': 0.20,
                'Configuration Security': 0.15,
                'Monitoring & Logging': 0.10
            },
            'infrastructure': {
                'Network Security': 0.25,
                'Identity & Access Management': 0.25,
                'Data Protection': 0.20,
                'Infrastructure Security': 0.20,
                'Compliance': 0.10
            }
        }
    
    def calculate_team_scorecard(self, team_data: Dict[str, Any]) -> SecurityScorecard:
        """
        Calculate security scorecard for a development team
        """
        categories = {}
        
        # Secure Coding Practices (25%)
        secure_coding_score = self.calculate_secure_coding_score(team_data)
        categories['Secure Coding Practices'] = {
            'score': secure_coding_score,
            'weight': 0.25,
            'details': team_data.get('secure_coding_practices', {})
        }
        
        # Security Testing (25%)
        security_testing_score = self.calculate_security_testing_score(team_data)
        categories['Security Testing'] = {
            'score': security_testing_score,
            'weight': 0.25,
            'details': team_data.get('security_testing', {})
        }
        
        # Vulnerability Management (20%)
        vulnerability_mgmt_score = self.calculate_vulnerability_management_score(team_data)
        categories['Vulnerability Management'] = {
            'score': vulnerability_mgmt_score,
            'weight': 0.20,
            'details': team_data.get('vulnerability_management', {})
        }
        
        # Security Awareness (20%)
        security_awareness_score = self.calculate_security_awareness_score(team_data)
        categories['Security Awareness'] = {
            'score': security_awareness_score,
            'weight': 0.20,
            'details': team_data.get('security_awareness', {})
        }
        
        # Compliance (10%)
        compliance_score = self.calculate_compliance_score(team_data)
        categories['Compliance'] = {
            'score': compliance_score,
            'weight': 0.10,
            'details': team_data.get('compliance', {})
        }
        
        # Calculate overall score
        overall_score = sum(
            cat_data['score'] * cat_data['weight'] 
            for cat_data in categories.values()
        )
        
        # Determine risk level
        risk_level = self.determine_risk_level(overall_score)
        
        # Generate recommendations
        recommendations = self.generate_recommendations(categories, overall_score)
        
        return SecurityScorecard(
            team_name=team_data.get('team_name', 'Unknown'),
            project_name=team_data.get('project_name', 'Unknown'),
            scorecard_date=datetime.utcnow(),
            categories=categories,
            overall_score=overall_score,
            risk_level=risk_level,
            recommendations=recommendations
        )
    
    def calculate_secure_coding_score(self, team_data: Dict[str, Any]) -> float:
        """
        Calculate secure coding practices score
        """
        practices = team_data.get('secure_coding_practices', {})
        
        score = 0
        total_possible = 0
        
        # Code review with security focus
        if practices.get('security_code_reviews', False):
            score += 20
        total_possible += 20
        
        # Static analysis tools usage
        if practices.get('static_analysis_tools', False):
            score += 20
        total_possible += 20
        
        # Secure coding training
        trained_developers = practices.get('trained_developers', 0)
        total_developers = practices.get('total_developers', 1)
        training_score = (trained_developers / total_developers) * 20
        score += training_score
        total_possible += 20
        
        # Security requirements in user stories
        if practices.get('security_requirements', False):
            score += 20
        total_possible += 20
        
        # Threat modeling
        if practices.get('threat_modeling', False):
            score += 20
        total_possible += 20
        
        return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
    
    def calculate_security_testing_score(self, team_data: Dict[str, Any]) -> float:
        """
        Calculate security testing score
        """
        testing = team_data.get('security_testing', {})
        
        score = 0
        total_possible = 0
        
        # Automated security testing in CI/CD
        if testing.get('ci_cd_security_tests', False):
            score += 25
        total_possible += 25
        
        # Dynamic application security testing
        if testing.get('dast_scans', False):
            score += 20
        total_possible += 20
        
        # Interactive application security testing
        if testing.get('iast_implementation', False):
            score += 15
        total_possible += 15
        
        # Penetration testing
        if testing.get('penetration_testing', False):
            score += 20
        total_possible += 20
        
        # Security test coverage
        coverage = testing.get('security_test_coverage', 0)
        coverage_score = min(20, (coverage / 100) * 20)
        score += coverage_score
        total_possible += 20
        
        return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
    
    def calculate_vulnerability_management_score(self, team_data: Dict[str, Any]) -> float:
        """
        Calculate vulnerability management score
        """
        vuln_mgmt = team_data.get('vulnerability_management', {})
        
        score = 0
        total_possible = 0
        
        # Time to fix critical vulnerabilities
        critical_fix_time = vuln_mgmt.get('mean_time_to_fix_critical', float('inf'))
        if critical_fix_time <= 24:  # hours
            score += 25
        elif critical_fix_time <= 72:
            score += 15
        elif critical_fix_time <= 168:  # 1 week
            score += 5
        total_possible += 25
        
        # Time to fix high vulnerabilities
        high_fix_time = vuln_mgmt.get('mean_time_to_fix_high', float('inf'))
        if high_fix_time <= 72:  # hours
            score += 20
        elif high_fix_time <= 168:  # 1 week
            score += 10
        total_possible += 20
        
        # Vulnerability remediation rate
        remediation_rate = vuln_mgmt.get('remediation_rate', 0)
        remediation_score = min(20, (remediation_rate / 100) * 20)
        score += remediation_score
        total_possible += 20
        
        # Vulnerability tracking system
        if vuln_mgmt.get('tracking_system', False):
            score += 15
        total_possible += 15
        
        # Regular vulnerability assessments
        if vuln_mgmt.get('regular_assessments', False):
            score += 20
        total_possible += 20
        
        return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
    
    def calculate_security_awareness_score(self, team_data: Dict[str, Any]) -> float:
        """
        Calculate security awareness score
        """
        awareness = team_data.get('security_awareness', {})
        
        score = 0
        total_possible = 0
        
        # Security training completion
        training_completion = awareness.get('training_completion_rate', 0)
        training_score = min(30, (training_completion / 100) * 30)
        score += training_score
        total_possible += 30
        
        # Security champions program
        if awareness.get('security_champions', False):
            score += 20
        total_possible += 20
        
        # Security awareness campaigns
        if awareness.get('awareness_campaigns', False):
            score += 15
        total_possible += 15
        
        # Security incident reporting
        if awareness.get('incident_reporting', False):
            score += 20
        total_possible += 20
        
        # Security best practices documentation
        if awareness.get('best_practices_documentation', False):
            score += 15
        total_possible += 15
        
        return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
    
    def calculate_compliance_score(self, team_data: Dict[str, Any]) -> float:
        """
        Calculate compliance score
        """
        compliance = team_data.get('compliance', {})
        
        score = 0
        total_possible = 0
        
        # Compliance framework adherence
        framework_adherence = compliance.get('framework_adherence', 0)
        framework_score = min(30, (framework_adherence / 100) * 30)
        score += framework_score
        total_possible += 30
        
        # Audit results
        audit_score = compliance.get('latest_audit_score', 0)
        audit_score_scaled = min(25, (audit_score / 100) * 25)
        score += audit_score_scaled
        total_possible += 25
        
        # Policy compliance
        policy_compliance = compliance.get('policy_compliance_rate', 0)
        policy_score = min(20, (policy_compliance / 100) * 20)
        score += policy_score
        total_possible += 20
        
        # Compliance automation
        if compliance.get('compliance_automation', False):
            score += 15
        total_possible += 15
        
        # Regular compliance reviews
        if compliance.get('regular_reviews', False):
            score += 10
        total_possible += 10
        
        return min(100, (score / total_possible) * 100) if total_possible > 0 else 0
    
    def determine_risk_level(self, overall_score: float) -> str:
        """
        Determine risk level based on overall score
        """
        if overall_score >= 90:
            return 'low'
        elif overall_score >= 70:
            return 'medium'
        elif overall_score >= 50:
            return 'high'
        else:
            return 'critical'
    
    def generate_recommendations(self, categories: Dict[str, Dict[str, Any]], overall_score: float) -> List[str]:
        """
        Generate recommendations based on scorecard results
        """
        recommendations = []
        
        # Add recommendations for categories with low scores
        for category_name, category_data in categories.items():
            if category_data['score'] < 70:
                recommendations.append(
                    f"Improve {category_name} (current score: {category_data['score']:.1f})"
                )
        
        # Add general recommendations based on overall score
        if overall_score < 70:
            recommendations.append("Implement comprehensive security training program")
            recommendations.append("Increase security testing automation")
            recommendations.append("Establish formal vulnerability management process")
        
        if overall_score < 50:
            recommendations.append("Conduct immediate security assessment")
            recommendations.append("Implement security champions program")
            recommendations.append("Review and update security policies")
        
        return recommendations
    
    def generate_scorecard_report(self, scorecard: SecurityScorecard) -> Dict[str, Any]:
        """
        Generate detailed scorecard report
        """
        report = {
            'scorecard_metadata': {
                'team_name': scorecard.team_name,
                'project_name': scorecard.project_name,
                'scorecard_date': scorecard.scorecard_date.isoformat(),
                'overall_score': scorecard.overall_score,
                'risk_level': scorecard.risk_level
            },
            'category_scores': {
                category_name: {
                    'score': category_data['score'],
                    'weight': category_data['weight'],
                    'details': category_data['details']
                }
                for category_name, category_data in scorecard.categories.items()
            },
            'recommendations': scorecard.recommendations,
            'benchmark_comparison': self.compare_to_benchmark(scorecard),
            'trend_analysis': self.analyze_trends(scorecard)
        }
        
        return report
    
    def compare_to_benchmark(self, scorecard: SecurityScorecard) -> Dict[str, Any]:
        """
        Compare scorecard to industry benchmarks
        """
        # Industry average benchmarks (these would come from real data in practice)
        benchmarks = {
            'Secure Coding Practices': 75.0,
            'Security Testing': 68.0,
            'Vulnerability Management': 72.0,
            'Security Awareness': 70.0,
            'Compliance': 85.0
        }
        
        comparison = {}
        for category_name, category_data in scorecard.categories.items():
            benchmark_score = benchmarks.get(category_name, 70.0)
            comparison[category_name] = {
                'team_score': category_data['score'],
                'benchmark_score': benchmark_score,
                'difference': category_data['score'] - benchmark_score,
                'performance': 'above' if category_data['score'] > benchmark_score else 'below'
            }
        
        return comparison
    
    def analyze_trends(self, scorecard: SecurityScorecard) -> Dict[str, Any]:
        """
        Analyze trends for the scorecard
        """
        # This would typically compare to previous scorecards
        # For this example, we'll provide a mock analysis
        return {
            'overall_trend': 'improving',
            'category_trends': {
                category_name: 'stable' for category_name in scorecard.categories.keys()
            },
            'improvement_areas': [
                'Vulnerability Management',
                'Security Awareness'
            ],
            'strengths': [
                'Compliance'
            ]
        }

# Example usage
def generate_security_scorecard():
    """
    Example of generating a security scorecard
    """
    generator = SecurityScorecardGenerator()
    
    # Sample team data
    team_data = {
        'team_name': 'Web Development Team',
        'project_name': 'Customer Portal',
        'secure_coding_practices': {
            'security_code_reviews': True,
            'static_analysis_tools': True,
            'trained_developers': 8,
            'total_developers': 10,
            'security_requirements': True,
            'threat_modeling': False
        },
        'security_testing': {
            'ci_cd_security_tests': True,
            'dast_scans': True,
            'iast_implementation': False,
            'penetration_testing': True,
            'security_test_coverage': 65
        },
        'vulnerability_management': {
            'mean_time_to_fix_critical': 18,
            'mean_time_to_fix_high': 45,
            'remediation_rate': 85,
            'tracking_system': True,
            'regular_assessments': True
        },
        'security_awareness': {
            'training_completion_rate': 90,
            'security_champions': True,
            'awareness_campaigns': True,
            'incident_reporting': True,
            'best_practices_documentation': True
        },
        'compliance': {
            'framework_adherence': 95,
            'latest_audit_score': 88,
            'policy_compliance_rate': 92,
            'compliance_automation': True,
            'regular_reviews': True
        }
    }
    
    # Generate scorecard
    scorecard = generator.calculate_team_scorecard(team_data)
    
    # Generate detailed report
    report = generator.generate_scorecard_report(scorecard)
    
    print(f"Security Scorecard for {scorecard.team_name}")
    print(f"Overall Score: {scorecard.overall_score:.2f}")
    print(f"Risk Level: {scorecard.risk_level}")
    print(f"Recommendations: {len(scorecard.recommendations)}")
    
    for i, rec in enumerate(scorecard.recommendations, 1):
        print(f"  {i}. {rec}")
    
    return report

# Run example
# report = generate_security_scorecard()

Security Dashboards

Real-time Security Dashboards

Dashboard Data Collection

PYTHON
# Example: Security dashboard data collector
import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import time
import threading
from dataclasses import dataclass

@dataclass
class DashboardMetric:
    name: str
    value: float
    timestamp: datetime
    source: str
    tags: Dict[str, str] = None

class SecurityDashboardCollector:
    """
    Collect real-time security metrics for dashboard
    """
    
    def __init__(self):
        self.metrics = []
        self.dashboard_data = {}
        self.data_sources = self.configure_data_sources()
        self.is_collecting = False
        self.collection_thread = None
    
    def configure_data_sources(self) -> Dict[str, Dict[str, Any]]:
        """
        Configure data sources for dashboard metrics
        """
        return {
            'vulnerability_scanner': {
                'type': 'api',
                'endpoint': 'http://vulnerability-scanner/api/metrics',
                'interval': 300,  # 5 minutes
                'metrics': ['vulnerability_count', 'vulnerability_trends']
            },
            'siem_system': {
                'type': 'api',
                'endpoint': 'http://siem/api/events',
                'interval': 60,  # 1 minute
                'metrics': ['security_events', 'incident_count']
            },
            'compliance_checker': {
                'type': 'api',
                'endpoint': 'http://compliance/api/status',
                'interval': 1800,  # 30 minutes
                'metrics': ['compliance_score', 'policy_violations']
            },
            'network_monitor': {
                'type': 'api',
                'endpoint': 'http://network-monitor/api/traffic',
                'interval': 30,  # 30 seconds
                'metrics': ['suspicious_traffic', 'blocked_connections']
            }
        }
    
    async def collect_from_source(self, source_name: str, source_config: Dict[str, Any]):
        """
        Collect metrics from a specific data source
        """
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(source_config['endpoint']) as response:
                    if response.status == 200:
                        data = await response.json()
                        
                        # Process the data and create metrics
                        for metric_name in source_config['metrics']:
                            if metric_name in data:
                                metric = DashboardMetric(
                                    name=metric_name,
                                    value=data[metric_name],
                                    timestamp=datetime.utcnow(),
                                    source=source_name,
                                    tags={'source': source_name}
                                )
                                self.metrics.append(metric)
                                
                                # Update dashboard data
                                self.dashboard_data[metric_name] = {
                                    'value': metric.value,
                                    'timestamp': metric.timestamp.isoformat(),
                                    'source': metric.source
                                }
        
        except Exception as e:
            print(f"Error collecting from {source_name}: {str(e)}")
    
    async def collect_all_data(self):
        """
        Collect data from all configured sources
        """
        tasks = []
        
        for source_name, source_config in self.data_sources.items():
            task = asyncio.create_task(self.collect_from_source(source_name, source_config))
            tasks.append(task)
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    def start_collection(self):
        """
        Start continuous metric collection
        """
        if self.is_collecting:
            return
        
        self.is_collecting = True
        self.collection_thread = threading.Thread(target=self._collection_loop)
        self.collection_thread.daemon = True
        self.collection_thread.start()
    
    def _collection_loop(self):
        """
        Main collection loop
        """
        while self.is_collecting:
            try:
                # Run collection asynchronously
                asyncio.run(self.collect_all_data())
                
                # Sleep for the minimum collection interval
                time.sleep(30)  # Collect every 30 seconds
                
            except Exception as e:
                print(f"Error in collection loop: {str(e)}")
                time.sleep(60)  # Wait longer on error
    
    def stop_collection(self):
        """
        Stop metric collection
        """
        self.is_collecting = False
        if self.collection_thread:
            self.collection_thread.join()
    
    def get_current_dashboard_data(self) -> Dict[str, Any]:
        """
        Get current dashboard data
        """
        return {
            'timestamp': datetime.utcnow().isoformat(),
            'metrics': self.dashboard_data,
            'metric_history': self.get_recent_metrics(100),  # Last 100 metrics
            'alerts': self.get_active_alerts(),
            'summary': self.get_dashboard_summary()
        }
    
    def get_recent_metrics(self, limit: int = 50) -> List[Dict[str, Any]]:
        """
        Get recent metrics for trend analysis
        """
        recent_metrics = sorted(self.metrics, key=lambda m: m.timestamp, reverse=True)[:limit]
        
        return [
            {
                'name': m.name,
                'value': m.value,
                'timestamp': m.timestamp.isoformat(),
                'source': m.source,
                'tags': m.tags
            }
            for m in recent_metrics
        ]
    
    def get_active_alerts(self) -> List[Dict[str, Any]]:
        """
        Get active security alerts
        """
        alerts = []
        
        # Check for high vulnerability counts
        vuln_count = self.dashboard_data.get('vulnerability_count', {}).get('value', 0)
        if vuln_count > 50:  # threshold
            alerts.append({
                'id': 'high_vulnerability_count',
                'severity': 'high',
                'message': f'High number of vulnerabilities detected: {vuln_count}',
                'timestamp': datetime.utcnow().isoformat()
            })
        
        # Check for security incidents
        incident_count = self.dashboard_data.get('incident_count', {}).get('value', 0)
        if incident_count > 5:  # threshold
            alerts.append({
                'id': 'high_incident_count',
                'severity': 'medium',
                'message': f'High number of security incidents: {incident_count}',
                'timestamp': datetime.utcnow().isoformat()
            })
        
        # Check for compliance violations
        violations = self.dashboard_data.get('policy_violations', {}).get('value', 0)
        if violations > 10:  # threshold
            alerts.append({
                'id': 'high_policy_violations',
                'severity': 'medium',
                'message': f'High number of policy violations: {violations}',
                'timestamp': datetime.utcnow().isoformat()
            })
        
        return alerts
    
    def get_dashboard_summary(self) -> Dict[str, Any]:
        """
        Get dashboard summary
        """
        return {
            'total_vulnerabilities': self.dashboard_data.get('vulnerability_count', {}).get('value', 0),
            'security_incidents': self.dashboard_data.get('incident_count', {}).get('value', 0),
            'compliance_score': self.dashboard_data.get('compliance_score', {}).get('value', 100),
            'active_alerts': len(self.get_active_alerts()),
            'last_update': datetime.utcnow().isoformat()
        }

# Example dashboard API
from flask import Flask, jsonify

app = Flask(__name__)
dashboard_collector = SecurityDashboardCollector()

@app.route('/api/dashboard/realtime')
def get_realtime_dashboard():
    """
    API endpoint for real-time dashboard data
    """
    dashboard_data = dashboard_collector.get_current_dashboard_data()
    return jsonify(dashboard_data)

@app.route('/api/dashboard/summary')
def get_dashboard_summary():
    """
    API endpoint for dashboard summary
    """
    summary = dashboard_collector.get_dashboard_summary()
    return jsonify(summary)

@app.route('/api/dashboard/alerts')
def get_dashboard_alerts():
    """
    API endpoint for active alerts
    """
    alerts = dashboard_collector.get_active_alerts()
    return jsonify(alerts)

@app.route('/api/dashboard/trends')
def get_dashboard_trends():
    """
    API endpoint for metric trends
    """
    trends = dashboard_collector.get_recent_metrics(50)
    return jsonify(trends)

# Example usage
def run_dashboard_example():
    """
    Example of running the dashboard collector
    """
    # Start the collector
    dashboard_collector.start_collection()
    
    try:
        # Let it run for a while to collect data
        time.sleep(10)
        
        # Get current dashboard data
        dashboard_data = dashboard_collector.get_current_dashboard_data()
        
        print(f"Dashboard Summary:")
        print(f"  Total Vulnerabilities: {dashboard_data['summary']['total_vulnerabilities']}")
        print(f"  Security Incidents: {dashboard_data['summary']['security_incidents']}")
        print(f"  Compliance Score: {dashboard_data['summary']['compliance_score']}")
        print(f"  Active Alerts: {dashboard_data['summary']['active_alerts']}")
        
        print(f"\nActive Alerts:")
        for alert in dashboard_data['alerts']:
            print(f"  - {alert['severity'].upper()}: {alert['message']}")
    
    finally:
        # Stop the collector
        dashboard_collector.stop_collection()

# Run example (commented out to prevent actual API startup)
# run_dashboard_example()

Dashboard Visualization

Dashboard Frontend Integration

HTML
<!-- Example: Security dashboard HTML template -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>DevSecOps Security Dashboard</title>
    <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .dashboard-container {
            max-width: 1200px;
            margin: 0 auto;
        }
        .dashboard-header {
            background-color: #2c3e50;
            color: white;
            padding: 20px;
            border-radius: 8px;
            margin-bottom: 20px;
        }
        .metrics-grid {
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
            gap: 20px;
            margin-bottom: 20px;
        }
        .metric-card {
            background-color: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .metric-value {
            font-size: 2em;
            font-weight: bold;
            color: #3498db;
        }
        .metric-title {
            font-size: 1em;
            color: #7f8c8d;
            margin-bottom: 10px;
        }
        .chart-container {
            background-color: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
            margin-bottom: 20px;
        }
        .alerts-section {
            background-color: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .alert-item {
            padding: 10px;
            margin: 5px 0;
            border-radius: 4px;
            border-left: 4px solid #e74c3c;
        }
        .alert-high { border-left-color: #e74c3c; }
        .alert-medium { border-left-color: #f39c12; }
        .alert-low { border-left-color: #3498db; }
    </style>
</head>
<body>
    <div class="dashboard-container">
        <div class="dashboard-header">
            <h1>DevSecOps Security Dashboard</h1>
            <p>Real-time security metrics and monitoring</p>
        </div>
        
        <div class="metrics-grid">
            <div class="metric-card">
                <div class="metric-title">Total Vulnerabilities</div>
                <div class="metric-value" id="total-vulnerabilities">0</div>
                <div class="metric-trend">↓ 12% from last week</div>
            </div>
            
            <div class="metric-card">
                <div class="metric-title">Security Incidents</div>
                <div class="metric-value" id="security-incidents">0</div>
                <div class="metric-trend">↑ 5% from last week</div>
            </div>
            
            <div class="metric-card">
                <div class="metric-title">Compliance Score</div>
                <div class="metric-value" id="compliance-score">0%</div>
                <div class="metric-trend">→ Stable</div>
            </div>
            
            <div class="metric-card">
                <div class="metric-title">Active Alerts</div>
                <div class="metric-value" id="active-alerts">0</div>
                <div class="metric-trend">↓ 20% from last week</div>
            </div>
        </div>
        
        <div class="chart-container">
            <h3>Vulnerability Trends (Last 30 Days)</h3>
            <div id="vulnerability-trend-chart" style="height: 400px;"></div>
        </div>
        
        <div class="chart-container">
            <h3>Security Incidents by Type</h3>
            <div id="incident-type-chart" style="height: 400px;"></div>
        </div>
        
        <div class="alerts-section">
            <h3>Active Security Alerts</h3>
            <div id="alerts-container">
                <p>No active alerts</p>
            </div>
        </div>
    </div>

    <script>
        // Function to fetch dashboard data
        async function fetchDashboardData() {
            try {
                const response = await fetch('/api/dashboard/realtime');
                const data = await response.json();
                updateDashboard(data);
            } catch (error) {
                console.error('Error fetching dashboard data:', error);
            }
        }
        
        // Function to update dashboard with new data
        function updateDashboard(data) {
            // Update metric cards
            document.getElementById('total-vulnerabilities').textContent = 
                data.summary.total_vulnerabilities;
            document.getElementById('security-incidents').textContent = 
                data.summary.security_incidents;
            document.getElementById('compliance-score').textContent = 
                data.summary.compliance_score + '%';
            document.getElementById('active-alerts').textContent = 
                data.summary.active_alerts;
            
            // Update charts
            updateVulnerabilityTrendChart(data.metric_history);
            updateIncidentTypeChart(data.metrics);
            updateAlerts(data.alerts);
        }
        
        // Function to update vulnerability trend chart
        function updateVulnerabilityTrendChart(metricHistory) {
            const vulnerabilityData = metricHistory
                .filter(m => m.name === 'vulnerability_count')
                .slice(-30); // Last 30 data points
            
            if (vulnerabilityData.length === 0) return;
            
            const xValues = vulnerabilityData.map(m => new Date(m.timestamp).toLocaleDateString());
            const yValues = vulnerabilityData.map(m => m.value);
            
            const trace = {
                x: xValues,
                y: yValues,
                type: 'scatter',
                mode: 'lines+markers',
                name: 'Vulnerabilities'
            };
            
            const layout = {
                title: 'Vulnerability Trends',
                xaxis: { title: 'Date' },
                yaxis: { title: 'Number of Vulnerabilities' }
            };
            
            Plotly.newPlot('vulnerability-trend-chart', [trace], layout);
        }
        
        // Function to update incident type chart
        function updateIncidentTypeChart(metrics) {
            // Mock data for incident types
            const incidentTypes = {
                'Malware': 15,
                'Unauthorized Access': 8,
                'DDoS': 3,
                'Phishing': 12,
                'Data Breach': 2
            };
            
            const trace = {
                x: Object.keys(incidentTypes),
                y: Object.values(incidentTypes),
                type: 'bar',
                name: 'Incident Types'
            };
            
            const layout = {
                title: 'Security Incidents by Type',
                xaxis: { title: 'Incident Type' },
                yaxis: { title: 'Number of Incidents' }
            };
            
            Plotly.newPlot('incident-type-chart', [trace], layout);
        }
        
        // Function to update alerts
        function updateAlerts(alerts) {
            const alertsContainer = document.getElementById('alerts-container');
            
            if (alerts.length === 0) {
                alertsContainer.innerHTML = '<p>No active alerts</p>';
                return;
            }
            
            alertsContainer.innerHTML = alerts.map(alert => `
                <div class="alert-item alert-${alert.severity}">
                    <strong>${alert.severity.toUpperCase()}:</strong> ${alert.message}
                    <br><small>${new Date(alert.timestamp).toLocaleString()}</small>
                </div>
            `).join('');
        }
        
        // Initial data fetch
        fetchDashboardData();
        
        // Update every 30 seconds
        setInterval(fetchDashboardData, 30000);
    </script>
</body>
</html>

Security Reporting

Automated Security Reports

Report Generation Framework

PYTHON
# Example: Security report generation framework
import jinja2
from datetime import datetime, timedelta
from typing import Dict, List, Any
import json
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from email.mime.base import MimeBase
from email import encoders
import pdfkit
import os

class SecurityReportGenerator:
    """
    Generate automated security reports
    """
    
    def __init__(self):
        self.templates = self.load_report_templates()
        self.report_config = self.load_report_config()
    
    def load_report_templates(self) -> Dict[str, str]:
        """
        Load report templates
        """
        templates = {
            'executive_summary': """
# Security Executive Summary - {{ date }}

## Key Metrics
- **Total Vulnerabilities**: {{ metrics.total_vulnerabilities }}
- **Security Incidents**: {{ metrics.security_incidents }}
- **Compliance Score**: {{ metrics.compliance_score }}%
- **Mean Time to Remediate**: {{ metrics.mttr }} hours

## Risk Assessment
- **Current Risk Level**: {{ risk_level }}
- **Trend**: {{ trend_analysis }}

## Key Achievements
{% for achievement in achievements %}
- {{ achievement }}
{% endfor %}

## Areas for Improvement
{% for improvement in improvements %}
- {{ improvement }}
{% endfor %}

## Recommendations
{% for recommendation in recommendations %}
1. {{ recommendation }}
{% endfor %}
            """,
            'technical_report': """
# Technical Security Report - {{ date }}

## Vulnerability Analysis
### Critical Vulnerabilities ({{ vulnerability_summary.critical }})
{% for vuln in critical_vulnerabilities %}
- {{ vuln.name }}: {{ vuln.description }}
{% endfor %}

### High Vulnerabilities ({{ vulnerability_summary.high }})
{% for vuln in high_vulnerabilities %}
- {{ vuln.name }}: {{ vuln.description }}
{% endfor %}

## Incident Analysis
{% for incident in incidents %}
### Incident: {{ incident.title }}
- **Severity**: {{ incident.severity }}
- **Status**: {{ incident.status }}
- **Detected**: {{ incident.detected_at }}
- **Resolved**: {{ incident.resolved_at if incident.resolved_at else 'Not resolved' }}
- **Description**: {{ incident.description }}
{% endfor %}

## Compliance Status
- **Overall Compliance**: {{ compliance_score }}%
- **Framework**: {{ compliance_framework }}
- **Last Audit**: {{ last_audit_date }}

## Security Controls Status
{% for control, status in security_controls.items() %}
- **{{ control }}**: {{ status }}
{% endfor %}
            """,
            'compliance_report': """
# Compliance Report - {{ date }}

## Compliance Framework: {{ framework }}

### Controls Assessment
{% for control in controls %}
#### {{ control.name }}
- **Status**: {{ control.status }}
- **Evidence**: {{ control.evidence }}
- **Finding**: {{ control.finding if control.finding else 'None' }}
{% endfor %}

### Gap Analysis
{% for gap in gaps %}
- {{ gap.description }} ({{ gap.severity }})
{% endfor %}

### Recommendations
{% for rec in recommendations %}
- {{ rec }}
{% endfor %}

### Next Audit
- **Scheduled**: {{ next_audit_date }}
- **Focus Areas**: {{ focus_areas }}
            """
        }
        
        return templates
    
    def load_report_config(self) -> Dict[str, Any]:
        """
        Load report configuration
        """
        return {
            'report_formats': ['pdf', 'html', 'json'],
            'distribution_lists': {
                'executive': ['[email protected]', '[email protected]'],
                'technical': ['[email protected]', '[email protected]'],
                'compliance': ['[email protected]', '[email protected]']
            },
            'scheduling': {
                'daily': ['security_summary'],
                'weekly': ['executive_summary', 'technical_report'],
                'monthly': ['compliance_report', 'trend_analysis']
            }
        }
    
    def generate_executive_summary(self, data: Dict[str, Any]) -> str:
        """
        Generate executive summary report
        """
        template = jinja2.Template(self.templates['executive_summary'])
        
        report_data = {
            'date': datetime.utcnow().strftime('%Y-%m-%d'),
            'metrics': {
                'total_vulnerabilities': data.get('total_vulnerabilities', 0),
                'security_incidents': data.get('security_incidents', 0),
                'compliance_score': data.get('compliance_score', 0),
                'mttr': data.get('mean_time_to_remediate', 0)
            },
            'risk_level': self.calculate_risk_level(data),
            'trend_analysis': self.analyze_trends(data),
            'achievements': data.get('achievements', []),
            'improvements': data.get('improvements', []),
            'recommendations': data.get('recommendations', [])
        }
        
        return template.render(**report_data)
    
    def generate_technical_report(self, data: Dict[str, Any]) -> str:
        """
        Generate technical security report
        """
        template = jinja2.Template(self.templates['technical_report'])
        
        report_data = {
            'date': datetime.utcnow().strftime('%Y-%m-%d'),
            'vulnerability_summary': data.get('vulnerability_summary', {}),
            'critical_vulnerabilities': data.get('critical_vulnerabilities', []),
            'high_vulnerabilities': data.get('high_vulnerabilities', []),
            'incidents': data.get('incidents', []),
            'compliance_score': data.get('compliance_score', 0),
            'compliance_framework': data.get('compliance_framework', 'N/A'),
            'last_audit_date': data.get('last_audit_date', 'N/A'),
            'security_controls': data.get('security_controls', {})
        }
        
        return template.render(**report_data)
    
    def generate_compliance_report(self, data: Dict[str, Any]) -> str:
        """
        Generate compliance report
        """
        template = jinja2.Template(self.templates['compliance_report'])
        
        report_data = {
            'date': datetime.utcnow().strftime('%Y-%m-%d'),
            'framework': data.get('framework', 'N/A'),
            'controls': data.get('controls', []),
            'gaps': data.get('gaps', []),
            'recommendations': data.get('recommendations', []),
            'next_audit_date': data.get('next_audit_date', 'N/A'),
            'focus_areas': data.get('focus_areas', [])
        }
        
        return template.render(**report_data)
    
    def calculate_risk_level(self, data: Dict[str, Any]) -> str:
        """
        Calculate overall risk level
        """
        vuln_score = data.get('total_vulnerabilities', 0) * 0.3
        incident_score = data.get('security_incidents', 0) * 0.4
        compliance_score = (100 - data.get('compliance_score', 100)) * 0.3
        
        total_score = vuln_score + incident_score + compliance_score
        
        if total_score >= 80:
            return 'Critical'
        elif total_score >= 60:
            return 'High'
        elif total_score >= 40:
            return 'Medium'
        else:
            return 'Low'
    
    def analyze_trends(self, data: Dict[str, Any]) -> str:
        """
        Analyze security trends
        """
        # This would typically compare to previous periods
        # For this example, we'll provide a mock analysis
        return "Security posture is improving with reduced vulnerability count"
    
    def generate_pdf_report(self, html_content: str, filename: str):
        """
        Generate PDF report from HTML content
        """
        options = {
            'page-size': 'A4',
            'margin-top': '0.75in',
            'margin-right': '0.75in',
            'margin-bottom': '0.75in',
            'margin-left': '0.75in',
            'encoding': "UTF-8",
            'no-outline': None
        }
        
        pdfkit.from_string(html_content, filename, options=options)
    
    def send_report_email(self, subject: str, body: str, recipients: List[str], attachments: List[str] = None):
        """
        Send report via email
        """
        msg = MimeMultipart()
        msg['Subject'] = subject
        msg['From'] = os.getenv('SMTP_FROM', '[email protected]')
        msg['To'] = ', '.join(recipients)
        
        msg.attach(MimeText(body, 'html'))
        
        # Add attachments if provided
        if attachments:
            for file_path in attachments:
                with open(file_path, 'rb') as attachment:
                    part = MimeBase('application', 'octet-stream')
                    part.set_payload(attachment.read())
                    encoders.encode_base64(part)
                    part.add_header(
                        'Content-Disposition',
                        f'attachment; filename= {os.path.basename(file_path)}'
                    )
                    msg.attach(part)
        
        # Send email
        smtp_server = os.getenv('SMTP_SERVER', 'localhost')
        smtp_port = int(os.getenv('SMTP_PORT', 587))
        smtp_user = os.getenv('SMTP_USER')
        smtp_password = os.getenv('SMTP_PASSWORD')
        
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(smtp_user, smtp_password)
            server.send_message(msg)
    
    def generate_and_distribute_report(self, report_type: str, data: Dict[str, Any], recipients: List[str] = None):
        """
        Generate and distribute security report
        """
        # Generate report content based on type
        if report_type == 'executive_summary':
            content = self.generate_executive_summary(data)
        elif report_type == 'technical_report':
            content = self.generate_technical_report(data)
        elif report_type == 'compliance_report':
            content = self.generate_compliance_report(data)
        else:
            raise ValueError(f"Unknown report type: {report_type}")
        
        # Generate PDF
        filename = f"security_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.pdf"
        self.generate_pdf_report(content, filename)
        
        # Send email if recipients provided
        if recipients:
            self.send_report_email(
                subject=f"Security Report - {datetime.utcnow().strftime('%Y-%m-%d')}",
                body=content,
                recipients=recipients,
                attachments=[filename]
            )
        
        return filename

# Example usage
def generate_security_reports():
    """
    Example of generating security reports
    """
    generator = SecurityReportGenerator()
    
    # Sample data
    report_data = {
        'total_vulnerabilities': 45,
        'security_incidents': 3,
        'compliance_score': 92.5,
        'mean_time_to_remediate': 18.5,
        'vulnerability_summary': {
            'critical': 2,
            'high': 8,
            'medium': 20,
            'low': 15
        },
        'critical_vulnerabilities': [
            {'name': 'CVE-2023-1234', 'description': 'SQL Injection in user authentication'},
            {'name': 'CVE-2023-5678', 'description': 'Authentication bypass vulnerability'}
        ],
        'high_vulnerabilities': [
            {'name': 'CVE-2023-9012', 'description': 'Cross-site scripting vulnerability'}
        ],
        'incidents': [
            {
                'title': 'Suspicious login attempt',
                'severity': 'high',
                'status': 'resolved',
                'detected_at': '2023-12-01T10:30:00Z',
                'resolved_at': '2023-12-01T12:15:00Z',
                'description': 'Multiple failed login attempts from unusual location'
            }
        ],
        'achievements': [
            'Reduced critical vulnerabilities by 25%',
            'Improved mean time to detect from 4 hours to 1 hour'
        ],
        'improvements': [
            'Vulnerability remediation process needs acceleration',
            'Security awareness training completion rate at 78% (target 90%)'
        ],
        'recommendations': [
            'Implement automated security testing in CI/CD pipeline',
            'Conduct quarterly penetration testing',
            'Enhance security monitoring capabilities'
        ]
    }
    
    # Generate executive summary
    exec_report = generator.generate_executive_summary(report_data)
    print("Executive Summary Generated")
    
    # Generate technical report
    tech_report = generator.generate_technical_report(report_data)
    print("Technical Report Generated")
    
    # Generate and send report
    filename = generator.generate_and_distribute_report(
        report_type='executive_summary',
        data=report_data,
        recipients=['[email protected]']
    )
    
    print(f"Report generated and sent: {filename}")
    
    return exec_report, tech_report

# Run example
# exec_report, tech_report = generate_security_reports()

Conclusion

DevSecOps security metrics and reporting provide the visibility and accountability necessary to maintain and improve security posture in continuous delivery environments. Effective security metrics should be:

  • Actionable: Metrics should drive specific actions and improvements
  • Measurable: Metrics should be quantifiable and trackable over time
  • Relevant: Metrics should align with business and security objectives
  • Timely: Metrics should be collected and reported in real-time or near real-time

The key to successful security metrics and reporting is establishing a comprehensive framework that covers all aspects of the security program while providing clear, actionable insights to stakeholders at all levels of the organization.

In the next article, we'll explore DevSecOps security culture and training, examining how to build and maintain a strong security culture within development teams.

You might also like

Browse all articles
Series

Virtual Networking with VMware

Comprehensive guide to VMware virtual networking, including vSwitches, port groups, VLANs, and network configuration best practices.

#VMware#Networking#vSwitch
Series

vCenter Server and Centralized Management

Complete guide to VMware vCenter Server and centralized management, covering installation, configuration, and management of VMware environments.

#VMware#vCenter Server#Centralized Management
Series

Storage Virtualization with VMware

Complete guide to VMware storage virtualization, including datastore types, storage protocols, and storage management strategies.

#VMware#Storage#Datastore
Series

Security Best Practices in VMware Environments

Comprehensive guide to security best practices in VMware environments, covering ESXi hardening, vCenter security, network security, and compliance.

#VMware#Security#Hardening