DevSecOps Compliance and Governance
Overview
DevSecOps compliance and governance ensure that security practices align with regulatory requirements, industry standards, and organizational policies. This article explores how to maintain compliance in automated environments while preserving the agility and speed that DevSecOps enables.
Understanding Compliance in DevSecOps
The Challenge of Compliance in Agile Environments
Traditional Compliance Models vs. DevSecOps
Traditional compliance approaches often conflict with DevSecOps principles:
Waterfall vs. Agile Compliance
- Traditional: Annual or quarterly compliance assessments
- DevSecOps: Continuous compliance monitoring
- Impact: Need for real-time compliance verification
Manual vs. Automated Controls
- Traditional: Manual evidence collection and assessment
- DevSecOps: Automated control monitoring and reporting
- Impact: Shift from point-in-time to continuous assurance
Static vs. Dynamic Infrastructure
- Traditional: Static infrastructure with known configurations
- DevSecOps: Dynamic, ephemeral infrastructure
- Impact: Need for infrastructure monitoring and configuration management
Compliance Framework Alignment
SOX (Sarbanes-Oxley Act)
SOX compliance in DevSecOps environments requires:
- Access Controls: Automated IAM management and monitoring
- Change Management: Automated change tracking and approval
- Segregation of Duties: Automated role separation in CI/CD pipelines
- Documentation: Automated evidence collection and retention
HIPAA (Health Insurance Portability and Accountability Act)
HIPAA compliance in DevSecOps requires:
- Data Protection: Automated encryption and access controls
- Audit Trails: Automated logging and monitoring of data access
- Incident Response: Automated breach detection and reporting
- Business Associate Agreements: Automated vendor compliance verification
PCI-DSS (Payment Card Industry Data Security Standard)
PCI-DSS compliance in DevSecOps requires:
- Network Security: Automated firewall and segmentation controls
- Data Protection: Automated encryption of cardholder data
- Vulnerability Management: Automated scanning and patching
- Access Control: Automated user provisioning and de-provisioning
GDPR (General Data Protection Regulation)
GDPR compliance in DevSecOps requires:
- Data Minimization: Automated data classification and retention
- Consent Management: Automated consent tracking and withdrawal
- Right to Erasure: Automated data deletion processes
- Data Portability: Automated data export capabilities
Continuous Compliance Philosophy
From Point-in-Time to Continuous
Traditional compliance models provide point-in-time assurance, while DevSecOps requires continuous compliance:
Point-in-Time Compliance
- Annual Audits: Compliance verified at specific points in time
- Manual Evidence: Collected manually for audit periods
- Static Controls: Controls tested at specific intervals
- Periodic Reports: Compliance status reported periodically
Continuous Compliance
- Real-time Monitoring: Ongoing compliance status monitoring
- Automated Evidence: Continuous evidence collection
- Dynamic Controls: Controls monitored and adjusted in real-time
- Instant Reporting: Real-time compliance status reporting
Compliance as Code
Infrastructure Compliance
Implement compliance controls as code:
# Example: AWS compliance with Terraform
resource "aws_s3_bucket" "secure_bucket" {
bucket = var.bucket_name
# Compliance requirement: Enable server-side encryption
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
# Compliance requirement: Enable versioning
versioning {
enabled = true
}
# Compliance requirement: Enable logging
logging {
target_bucket = aws_s3_bucket.log_bucket.id
target_prefix = "log/"
}
# Compliance requirement: Block public access
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Example: Compliance check with Checkov
resource "aws_iam_role" "compliant_role" {
name = "compliant-role"
# Compliance requirement: Least privilege access
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "ec2.amazonaws.com"
}
}
]
})
# Compliance requirement: No inline policies (for audit trail)
# Attach managed policies separately
}
# Managed policy with specific permissions
resource "aws_iam_role_policy_attachment" "attach_policy" {
role = aws_iam_role.compliant_role.name
policy_arn = "arn:aws:iam::aws:policy/ReadOnlyAccess"
}Application Compliance
# Example: Application compliance with security controls
import os
import logging
from functools import wraps
from flask import Flask, request, g
import jwt
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
app = Flask(__name__)
# Compliance: Audit logging decorator
def audit_log(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Log access for compliance
logging.info(f"Access to {request.endpoint} by {request.remote_addr} at {request.timestamp}")
# Add compliance metadata
g.audit_id = os.urandom(16).hex()
result = func(*args, **kwargs)
# Log completion
logging.info(f"Completed {request.endpoint} with audit_id {g.audit_id}")
return result
return wrapper
# Compliance: Data protection middleware
@app.before_request
def compliance_check():
# Check for compliance requirements
if request.method in ['POST', 'PUT', 'PATCH']:
# Ensure sensitive data is encrypted
if 'sensitive_data' in request.form:
# Log for compliance that sensitive data was detected
logging.warning(f"Sensitive data detected in {request.path}")
# Ensure authentication for protected endpoints
if request.endpoint in app.config.get('PROTECTED_ENDPOINTS', []):
token = request.headers.get('Authorization')
if not token:
logging.warning(f"Unauthorized access attempt to {request.endpoint}")
return {'error': 'Authentication required'}, 401
# Compliance: Data retention policy
def enforce_data_retention(data, retention_period_days=365):
"""
Enforce data retention policy as per compliance requirements
"""
import datetime
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_period_days)
# Filter data based on retention policy
filtered_data = [item for item in data if item.created_date > cutoff_date]
# Log for compliance that data retention was applied
logging.info(f"Applied data retention policy, removed {len(data) - len(filtered_data)} records")
return filtered_data
# Compliance: Personal data handling
class PersonalDataManager:
def __init__(self):
self.encryption_key = os.environ.get('PERSONAL_DATA_ENCRYPTION_KEY')
def encrypt_personal_data(self, data):
"""
Encrypt personal data as per GDPR compliance
"""
# Implementation of encryption
encrypted_data = self._encrypt(data, self.encryption_key)
logging.info("Personal data encrypted for GDPR compliance")
return encrypted_data
def right_to_erasure(self, user_id):
"""
Implement right to erasure as per GDPR
"""
# Delete all personal data for user
deleted_count = self._delete_personal_data(user_id)
logging.info(f"Erased {deleted_count} personal data records for user {user_id} as per GDPR Article 17")
return deleted_count
def data_portability(self, user_id):
"""
Implement data portability as per GDPR
"""
user_data = self._get_personal_data(user_id)
portable_data = self._format_for_portability(user_data)
logging.info(f"Exported personal data for user {user_id} as per GDPR Article 20")
return portable_dataCompliance Automation Framework
Automated Compliance Monitoring
Continuous Compliance Engine
# Example: Continuous compliance monitoring engine
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any
import boto3
from azure.identity import DefaultAzureCredential
from google.cloud import securitycenter
class ContinuousComplianceEngine:
def __init__(self):
self.aws_client = boto3.client('config')
self.azure_credential = DefaultAzureCredential()
self.gcp_client = securitycenter.SecurityCenterClient()
self.compliance_rules = self.load_compliance_rules()
self.alert_channels = self.setup_alert_channels()
def load_compliance_rules(self) -> Dict[str, Any]:
"""
Load compliance rules from configuration
"""
# Load rules from compliance framework files
rules = {}
for framework in ['SOX', 'HIPAA', 'PCI-DSS', 'GDPR']:
with open(f'compliance-rules/{framework.lower()}.json', 'r') as f:
rules[framework] = json.load(f)
return rules
async def monitor_aws_compliance(self) -> List[Dict[str, Any]]:
"""
Monitor AWS compliance in real-time
"""
compliance_results = []
# Get AWS Config compliance data
response = self.aws_client.describe_compliance_by_config_rule()
for rule in response['ComplianceByConfigRules']:
rule_name = rule['ConfigRuleName']
# Check against compliance rules
for framework, rules in self.compliance_rules.items():
if rule_name in rules:
compliance_status = {
'timestamp': datetime.utcnow().isoformat(),
'resource_type': 'AWS',
'rule_name': rule_name,
'framework': framework,
'compliance_status': rule['Compliance']['ComplianceType'],
'details': rule
}
compliance_results.append(compliance_status)
# Log compliance status
if rule['Compliance']['ComplianceType'] != 'COMPLIANT':
logging.warning(f"Non-compliant AWS resource: {rule_name}")
# Trigger alert if non-compliant
await self.trigger_compliance_alert(compliance_status)
return compliance_results
async def monitor_azure_compliance(self) -> List[Dict[str, Any]]:
"""
Monitor Azure compliance in real-time
"""
compliance_results = []
# Use Azure Security Center API
from azure.mgmt.security import SecurityCenter
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
security_client = SecurityCenter(credential, subscription_id=os.environ['AZURE_SUBSCRIPTION_ID'])
# Get security recommendations
recommendations = security_client.assessments.list()
for rec in recommendations:
compliance_status = {
'timestamp': datetime.utcnow().isoformat(),
'resource_type': 'Azure',
'assessment_id': rec.id,
'compliance_status': rec.status.code,
'category': rec.category,
'severity': rec.metadata.severity
}
compliance_results.append(compliance_status)
if rec.status.code != 'Healthy':
logging.warning(f"Non-compliant Azure resource: {rec.id}")
await self.trigger_compliance_alert(compliance_status)
return compliance_results
async def monitor_gcp_compliance(self) -> List[Dict[str, Any]]:
"""
Monitor GCP compliance in real-time
"""
compliance_results = []
# Use GCP Security Command Center
org_id = f"organizations/{os.environ['GCP_ORGANIZATION_ID']}"
response = self.gcp_client.list_assets(request={"parent": org_id})
for asset in response:
# Check compliance based on asset type and configuration
compliance_result = self.evaluate_asset_compliance(asset)
compliance_results.append(compliance_result)
if not compliance_result['is_compliant']:
logging.warning(f"Non-compliant GCP asset: {asset.name}")
await self.trigger_compliance_alert(compliance_result)
return compliance_results
def evaluate_asset_compliance(self, asset: Dict[str, Any]) -> Dict[str, Any]:
"""
Evaluate compliance of individual asset
"""
asset_type = asset.asset_type
compliance_result = {
'timestamp': datetime.utcnow().isoformat(),
'asset_name': asset.name,
'asset_type': asset_type,
'is_compliant': True,
'violations': [],
'frameworks': []
}
# Check against compliance rules based on asset type
for framework, rules in self.compliance_rules.items():
for rule in rules.get(asset_type, []):
if not self.check_rule_compliance(asset, rule):
compliance_result['is_compliant'] = False
compliance_result['violations'].append(rule['id'])
compliance_result['frameworks'].append(framework)
return compliance_result
async def trigger_compliance_alert(self, compliance_status: Dict[str, Any]):
"""
Trigger compliance alert through configured channels
"""
alert_message = {
'type': 'COMPLIANCE_VIOLATION',
'timestamp': compliance_status['timestamp'],
'resource': compliance_status.get('rule_name') or compliance_status.get('assessment_id'),
'status': compliance_status.get('compliance_status'),
'details': compliance_status
}
# Send alerts to all configured channels
for channel in self.alert_channels:
await channel.send_alert(alert_message)
async def run_compliance_monitoring_cycle(self):
"""
Run a complete compliance monitoring cycle
"""
logging.info("Starting compliance monitoring cycle")
# Monitor all cloud providers
aws_results = await self.monitor_aws_compliance()
azure_results = await self.monitor_azure_compliance()
gcp_results = await self.monitor_gcp_compliance()
# Aggregate results
all_results = aws_results + azure_results + gcp_results
# Generate compliance report
report = self.generate_compliance_report(all_results)
# Store compliance evidence
self.store_compliance_evidence(report)
logging.info(f"Compliance monitoring cycle completed: {len(all_results)} resources checked")
return report
def generate_compliance_report(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate comprehensive compliance report
"""
report = {
'timestamp': datetime.utcnow().isoformat(),
'total_resources': len(results),
'compliant_resources': len([r for r in results if r.get('compliance_status') == 'COMPLIANT']),
'non_compliant_resources': len([r for r in results if r.get('compliance_status') != 'COMPLIANT']),
'compliance_percentage': 0,
'framework_summary': {},
'details': results
}
if report['total_resources'] > 0:
report['compliance_percentage'] = (report['compliant_resources'] / report['total_resources']) * 100
# Generate framework-specific summaries
for result in results:
framework = result.get('framework', 'Unknown')
if framework not in report['framework_summary']:
report['framework_summary'][framework] = {
'total': 0,
'compliant': 0,
'non_compliant': 0
}
report['framework_summary'][framework]['total'] += 1
if result.get('compliance_status') == 'COMPLIANT':
report['framework_summary'][framework]['compliant'] += 1
else:
report['framework_summary'][framework]['non_compliant'] += 1
return report
# Example usage
async def main():
engine = ContinuousComplianceEngine()
# Run compliance monitoring every hour
while True:
report = await engine.run_compliance_monitoring_cycle()
# Log summary
print(f"Compliance Report: {report['compliance_percentage']:.2f}% compliant")
# Wait for next cycle
await asyncio.sleep(3600) # 1 hour
if __name__ == '__main__':
asyncio.run(main())Compliance Dashboard and Reporting
#!/bin/bash
# Example: Compliance reporting automation
COMPLIANCE_REPORT_DIR="/var/reports/compliance"
ARCHIVE_DIR="/var/archive/compliance"
# Function to generate compliance report
generate_compliance_report() {
local report_date=$(date +%Y%m%d)
local report_file="$COMPLIANCE_REPORT_DIR/compliance-report-$report_date.json"
echo "Generating compliance report for $report_date..."
# Collect compliance data from various sources
cat << EOF > $report_file
{
"report_date": "$report_date",
"report_timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"summary": {
"total_resources": $(get_total_resources),
"compliant_resources": $(get_compliant_resources),
"non_compliant_resources": $(get_non_compliant_resources),
"compliance_percentage": $(calculate_compliance_percentage)
},
"frameworks": {
"SOX": $(get_framework_compliance SOX),
"HIPAA": $(get_framework_compliance HIPAA),
"PCI-DSS": $(get_framework_compliance PCIDSS),
"GDPR": $(get_framework_compliance GDPR)
},
"recent_violations": [
$(get_recent_violations)
],
"trends": {
"last_7_days": [
$(get_weekly_trend)
]
}
}
EOF
# Archive previous reports
archive_old_reports
echo "Compliance report generated: $report_file"
}
# Function to get total resources
get_total_resources() {
# Query compliance database
mysql -u $DB_USER -p$DB_PASS -D $COMPLIANCE_DB -sN -e "SELECT COUNT(*) FROM resources;"
}
# Function to get compliant resources
get_compliant_resources() {
mysql -u $DB_USER -p$DB_PASS -D $COMPLIANCE_DB -sN -e "SELECT COUNT(*) FROM resources WHERE compliance_status = 'COMPLIANT';"
}
# Function to get non-compliant resources
get_non_compliant_resources() {
mysql -u $DB_USER -p$DB_PASS -D $COMPLIANCE_DB -sN -e "SELECT COUNT(*) FROM resources WHERE compliance_status != 'COMPLIANT';"
}
# Function to calculate compliance percentage
calculate_compliance_percentage() {
local total=$(get_total_resources)
local compliant=$(get_compliant_resources)
if [ $total -eq 0 ]; then
echo "0"
else
echo "scale=2; $compliant * 100 / $total" | bc
fi
}
# Function to get framework compliance
get_framework_compliance() {
local framework=$1
local total=$(mysql -u $DB_USER -p$DB_PASS -D $COMPLIANCE_DB -sN -e "SELECT COUNT(*) FROM compliance_checks WHERE framework = '$framework';")
local compliant=$(mysql -u $DB_USER -p$DB_PASS -D $COMPLIANCE_DB -sN -e "SELECT COUNT(*) FROM compliance_checks WHERE framework = '$framework' AND status = 'COMPLIANT';")
cat << EOF
{
"total": $total,
"compliant": $compliant,
"percentage": $(if [ $total -eq 0 ]; then echo "0"; else echo "scale=2; $compliant * 100 / $total" | bc; fi)
}
EOF
}
# Function to get recent violations
get_recent_violations() {
mysql -u $DB_USER -p$DB_PASS -D $COMPLIANCE_DB -sN -e "
SELECT CONCAT('{\"resource\":\"', resource_id, '\",\"violation\":\"', violation_description, '\",\"timestamp\":\"', timestamp, '\"}')
FROM compliance_violations
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
ORDER BY timestamp DESC
LIMIT 10;" | tr '\n' ',' | sed 's/,$//'
}
# Function to archive old reports
archive_old_reports() {
local archive_date=$(date -d '7 days ago' +%Y%m%d)
mkdir -p $ARCHIVE_DIR
# Move reports older than 7 days to archive
find $COMPLIANCE_REPORT_DIR -name "compliance-report-*.json" -mtime +7 -exec mv {} $ARCHIVE_DIR/ \;
}
# Function to send compliance alerts
send_compliance_alerts() {
local compliance_percentage=$(calculate_compliance_percentage)
local threshold=95 # 95% compliance threshold
if (( $(echo "$compliance_percentage < $threshold" | bc -l) )); then
# Send alert to compliance team
echo "Compliance Alert: Current compliance is ${compliance_percentage}% (threshold: ${threshold}%)"
# Send email notification
send_mail -s "Compliance Alert: Low Compliance Percentage" -t [email protected] << EOF
Compliance Alert:
Current compliance percentage: $compliance_percentage%
Threshold: $threshold%
Please investigate and take corrective action.
Report: $COMPLIANCE_REPORT_DIR/compliance-report-$(date +%Y%m%d).json
EOF
fi
}
# Main execution
if [ ! -d "$COMPLIANCE_REPORT_DIR" ]; then
mkdir -p $COMPLIANCE_REPORT_DIR
fi
# Generate report
generate_compliance_report
# Send alerts if needed
send_compliance_alerts
# Update dashboard
update_compliance_dashboard
echo "Compliance reporting completed"Compliance Evidence Management
Automated Evidence Collection
# Example: Automated compliance evidence collector
import os
import hashlib
import json
from datetime import datetime
from typing import Dict, List, Any
import boto3
import paramiko
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
class ComplianceEvidenceCollector:
def __init__(self, config_file: str):
self.config = self.load_config(config_file)
self.evidence_store = self.initialize_evidence_store()
self.signing_key = self.load_signing_key()
def load_config(self, config_file: str) -> Dict[str, Any]:
"""
Load compliance evidence collection configuration
"""
with open(config_file, 'r') as f:
return json.load(f)
def initialize_evidence_store(self):
"""
Initialize evidence storage (could be S3, database, etc.)
"""
# Example: Initialize S3 evidence store
s3_client = boto3.client('s3')
return s3_client
def load_signing_key(self):
"""
Load private key for evidence signing
"""
with open(self.config['signing_key_path'], 'rb') as key_file:
pem_data = key_file.read()
return load_pem_private_key(pem_data, password=None)
def collect_aws_evidence(self, resource_ids: List[str]) -> List[Dict[str, Any]]:
"""
Collect evidence for AWS resources
"""
evidence_collection = []
ec2_client = boto3.client('ec2')
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')
for resource_id in resource_ids:
if resource_id.startswith('i-'): # EC2 Instance
instance_data = ec2_client.describe_instances(InstanceIds=[resource_id])
evidence = {
'resource_id': resource_id,
'resource_type': 'EC2_Instance',
'collection_timestamp': datetime.utcnow().isoformat(),
'data': instance_data['Reservations'][0]['Instances'][0],
'checksum': self.calculate_checksum(instance_data)
}
evidence_collection.append(evidence)
elif resource_id.startswith('arn:aws:iam'): # IAM Resource
# Extract resource type and name from ARN
arn_parts = resource_id.split(':')
resource_type = arn_parts[5].split('/')[0]
resource_name = arn_parts[5].split('/')[-1]
if resource_type == 'role':
role_data = iam_client.get_role(RoleName=resource_name)
evidence = {
'resource_id': resource_id,
'resource_type': 'IAM_Role',
'collection_timestamp': datetime.utcnow().isoformat(),
'data': role_data['Role'],
'checksum': self.calculate_checksum(role_data)
}
evidence_collection.append(evidence)
return evidence_collection
def collect_application_evidence(self, app_endpoints: List[str]) -> List[Dict[str, Any]]:
"""
Collect evidence from application endpoints
"""
evidence_collection = []
for endpoint in app_endpoints:
# Collect application security evidence
app_evidence = self.get_app_security_config(endpoint)
evidence = {
'endpoint': endpoint,
'resource_type': 'Application',
'collection_timestamp': datetime.utcnow().isoformat(),
'data': app_evidence,
'checksum': self.calculate_checksum(app_evidence)
}
evidence_collection.append(evidence)
return evidence_collection
def get_app_security_config(self, endpoint: str) -> Dict[str, Any]:
"""
Get application security configuration
"""
import requests
# Get security headers
response = requests.get(endpoint)
security_headers = {}
for header in ['X-Frame-Options', 'X-XSS-Protection', 'X-Content-Type-Options',
'Strict-Transport-Security', 'Content-Security-Policy']:
if header in response.headers:
security_headers[header] = response.headers[header]
# Get SSL certificate information
import ssl
import socket
hostname = endpoint.replace('https://', '').split('/')[0]
context = ssl.create_default_context()
try:
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
ssl_info = {
'subject': dict(x[0] for x in cert['subject']),
'issuer': dict(x[0] for x in cert['issuer']),
'version': cert['version'],
'serial_number': cert['serialNumber'],
'not_before': cert['notBefore'],
'not_after': cert['notAfter']
}
except Exception as e:
ssl_info = {'error': str(e)}
return {
'security_headers': security_headers,
'ssl_certificate': ssl_info,
'server_signature': response.headers.get('Server', 'Unknown')
}
def collect_network_evidence(self, network_configs: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Collect network security evidence
"""
evidence_collection = []
for net_config in network_configs:
# Connect to network device and collect configuration
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh_client.connect(
hostname=net_config['host'],
username=net_config['username'],
password=net_config['password']
)
# Execute commands to get security configuration
stdin, stdout, stderr = ssh_client.exec_command('show running-config | include security')
security_config = stdout.read().decode()
evidence = {
'device_id': net_config['host'],
'resource_type': 'Network_Device',
'collection_timestamp': datetime.utcnow().isoformat(),
'data': {
'security_config': security_config,
'device_model': net_config['model'],
'firmware_version': net_config.get('firmware_version', 'unknown')
},
'checksum': self.calculate_checksum(security_config)
}
evidence_collection.append(evidence)
finally:
ssh_client.close()
return evidence_collection
def calculate_checksum(self, data: Any) -> str:
"""
Calculate checksum for evidence data
"""
data_str = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(data_str.encode()).hexdigest()
def sign_evidence(self, evidence: Dict[str, Any]) -> Dict[str, Any]:
"""
Sign evidence with private key
"""
evidence_copy = evidence.copy()
data_to_sign = json.dumps(evidence_copy, sort_keys=True, default=str)
signature = self.signing_key.sign(
data_to_sign.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
evidence_copy['signature'] = signature.hex()
return evidence_copy
def store_evidence(self, evidence: List[Dict[str, Any]], framework: str) -> str:
"""
Store evidence in compliance repository
"""
# Create evidence package
evidence_package = {
'framework': framework,
'collection_timestamp': datetime.utcnow().isoformat(),
'evidence_count': len(evidence),
'evidence_items': [self.sign_evidence(item) for item in evidence],
'package_checksum': self.calculate_checksum(evidence)
}
# Store in S3
s3_key = f"compliance-evidence/{framework}/{datetime.utcnow().strftime('%Y/%m/%d')}/evidence-{datetime.utcnow().strftime('%H%M%S')}.json"
self.evidence_store.put_object(
Bucket=self.config['evidence_bucket'],
Key=s3_key,
Body=json.dumps(evidence_package, indent=2),
ServerSideEncryption='AES256'
)
return s3_key
def collect_and_store_evidence(self, framework: str) -> str:
"""
Collect and store evidence for a compliance framework
"""
# Define evidence collection based on framework
if framework == 'SOX':
# SOX-specific evidence collection
aws_resources = self.config.get('sox_aws_resources', [])
app_endpoints = self.config.get('sox_app_endpoints', [])
evidence = []
evidence.extend(self.collect_aws_evidence(aws_resources))
evidence.extend(self.collect_application_evidence(app_endpoints))
elif framework == 'HIPAA':
# HIPAA-specific evidence collection
aws_resources = self.config.get('hipaa_aws_resources', [])
network_configs = self.config.get('hipaa_network_configs', [])
evidence = []
evidence.extend(self.collect_aws_evidence(aws_resources))
evidence.extend(self.collect_network_evidence(network_configs))
elif framework == 'PCI-DSS':
# PCI-DSS-specific evidence collection
aws_resources = self.config.get('pcidss_aws_resources', [])
app_endpoints = self.config.get('pcidss_app_endpoints', [])
evidence = []
evidence.extend(self.collect_aws_evidence(aws_resources))
evidence.extend(self.collect_application_evidence(app_endpoints))
# Store collected evidence
s3_key = self.store_evidence(evidence, framework)
print(f"Collected and stored {len(evidence)} evidence items for {framework} at {s3_key}")
return s3_key
# Example usage
collector = ComplianceEvidenceCollector('compliance-config.json')
# Collect evidence for different frameworks
sox_evidence = collector.collect_and_store_evidence('SOX')
hipaa_evidence = collector.collect_and_store_evidence('HIPAA')
pcidss_evidence = collector.collect_and_store_evidence('PCI-DSS')Governance Framework Implementation
Risk Management Integration
Risk Assessment Automation
# Example: Automated risk assessment for compliance
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np
from datetime import datetime, timedelta
class AutomatedRiskAssessment:
def __init__(self):
self.risk_model = self.train_risk_model()
self.threat_intelligence = self.load_threat_intelligence()
self.compliance_mapping = self.load_compliance_mappings()
def train_risk_model(self):
"""
Train risk assessment model using historical data
"""
# Load historical risk data
risk_data = pd.read_csv('historical-risk-data.csv')
# Features for risk assessment
features = [
'vulnerability_severity', 'patch_age', 'access_level',
'data_classification', 'network_segment', 'threat_level',
'compliance_gap', 'control_effectiveness'
]
X = risk_data[features]
y = risk_data['risk_score'] # Target variable
# Split data and train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
return model
def load_threat_intelligence(self):
"""
Load current threat intelligence data
"""
# This would typically come from threat intelligence feeds
return {
'active_threats': [
{'id': 'CVE-2023-XXXX', 'severity': 'HIGH', 'affects': ['web_servers']},
{'id': 'CVE-2023-YYYY', 'severity': 'CRITICAL', 'affects': ['databases']}
],
'geographic_threats': {
'us-east-1': ['DDoS', 'Phishing'],
'eu-west-1': ['APT', 'Ransomware']
}
}
def load_compliance_mappings(self):
"""
Load compliance requirement mappings
"""
return {
'SOX': {
'access_control': ['IAM', 'Authentication'],
'change_management': ['CI/CD', 'Deployment'],
'audit_logging': ['Logging', 'Monitoring']
},
'HIPAA': {
'data_encryption': ['Storage', 'Transmission'],
'access_logs': ['Authentication', 'Authorization'],
'incident_response': ['Monitoring', 'Alerting']
},
'PCI-DSS': {
'network_security': ['Firewalls', 'Segmentation'],
'data_protection': ['Encryption', 'Tokenization'],
'vulnerability_mgmt': ['Scanning', 'Patching']
}
}
def assess_resource_risk(self, resource_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Assess risk for a specific resource
"""
# Extract features from resource configuration
features = self.extract_features(resource_config)
# Predict risk score using trained model
risk_score = self.risk_model.predict_proba([features])[0][1] # Probability of high risk
# Determine risk level
if risk_score >= 0.8:
risk_level = 'CRITICAL'
elif risk_score >= 0.6:
risk_level = 'HIGH'
elif risk_score >= 0.4:
risk_level = 'MEDIUM'
else:
risk_level = 'LOW'
# Identify affected compliance frameworks
affected_frameworks = self.identify_affected_compliance(resource_config, risk_level)
# Generate mitigation recommendations
recommendations = self.generate_recommendations(resource_config, risk_level)
risk_assessment = {
'resource_id': resource_config['id'],
'risk_score': float(risk_score),
'risk_level': risk_level,
'affected_frameworks': affected_frameworks,
'recommendations': recommendations,
'assessment_timestamp': datetime.utcnow().isoformat(),
'confidence': float(np.max(self.risk_model.predict_proba([features])[0]))
}
return risk_assessment
def extract_features(self, resource_config: Dict[str, Any]) -> List[float]:
"""
Extract features for risk assessment model
"""
# Example feature extraction (would be more complex in practice)
features = [
resource_config.get('vulnerability_severity', 0) / 10.0, # Normalize to 0-1
resource_config.get('patch_age_days', 0) / 365.0, # Days normalized to years
resource_config.get('access_level', 1) / 5.0, # Assuming 1-5 scale
resource_config.get('data_classification', 1) / 5.0, # Assuming 1-5 scale
resource_config.get('network_segment_risk', 1) / 5.0, # Assuming 1-5 scale
resource_config.get('threat_level', 1) / 5.0, # Assuming 1-5 scale
resource_config.get('compliance_gap_score', 1) / 5.0, # Assuming 1-5 scale
resource_config.get('control_effectiveness', 1) / 5.0 # Assuming 1-5 scale
]
return features
def identify_affected_compliance(self, resource_config: Dict[str, Any], risk_level: str) -> List[str]:
"""
Identify which compliance frameworks are affected by the risk
"""
affected_frameworks = []
# Check if resource type affects specific compliance frameworks
resource_type = resource_config.get('type', '').lower()
data_classification = resource_config.get('data_classification', '').upper()
# SOX compliance (financial data)
if resource_type in ['database', 'application'] and data_classification in ['FINANCIAL', 'REGULATORY']:
affected_frameworks.append('SOX')
# HIPAA compliance (health data)
if resource_type in ['database', 'application'] and data_classification in ['PHI', 'HEALTH']:
affected_frameworks.append('HIPAA')
# PCI-DSS compliance (payment data)
if resource_type in ['payment_gateway', 'card_processing'] and data_classification in ['PCI', 'PAYMENT']:
affected_frameworks.append('PCI-DSS')
return affected_frameworks
def generate_recommendations(self, resource_config: Dict[str, Any], risk_level: str) -> List[str]:
"""
Generate risk mitigation recommendations
"""
recommendations = []
if risk_level in ['HIGH', 'CRITICAL']:
# Add specific recommendations based on resource type and configuration
if resource_config.get('public_access', False):
recommendations.append("Restrict public access to resource")
if not resource_config.get('encryption_enabled', False):
recommendations.append("Enable encryption for data at rest and in transit")
if resource_config.get('authentication_required', True) and not resource_config.get('multi_factor_auth', False):
recommendations.append("Implement multi-factor authentication")
if resource_config.get('logging_enabled', False) == False:
recommendations.append("Enable comprehensive logging and monitoring")
return recommendations
def run_comprehensive_risk_assessment(self, resources: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Run comprehensive risk assessment for multiple resources
"""
assessments = []
for resource in resources:
assessment = self.assess_resource_risk(resource)
assessments.append(assessment)
# Generate overall risk report
total_resources = len(assessments)
critical_risks = len([a for a in assessments if a['risk_level'] == 'CRITICAL'])
high_risks = len([a for a in assessments if a['risk_level'] == 'HIGH'])
report = {
'report_timestamp': datetime.utcnow().isoformat(),
'total_resources_assessed': total_resources,
'critical_risks': critical_risks,
'high_risks': high_risks,
'medium_risks': len([a for a in assessments if a['risk_level'] == 'MEDIUM']),
'low_risks': len([a for a in assessments if a['risk_level'] == 'LOW']),
'risk_distribution': {
'critical': critical_risks / total_resources if total_resources > 0 else 0,
'high': high_risks / total_resources if total_resources > 0 else 0
},
'individual_assessments': assessments,
'top_recommendations': self.get_top_recommendations(assessments)
}
return report
def get_top_recommendations(self, assessments: List[Dict[str, Any]]) -> List[str]:
"""
Get top recommendations from all assessments
"""
all_recommendations = []
for assessment in assessments:
if assessment['risk_level'] in ['HIGH', 'CRITICAL']:
all_recommendations.extend(assessment['recommendations'])
# Count frequency of recommendations
from collections import Counter
recommendation_counts = Counter(all_recommendations)
# Return top 5 recommendations
return [rec for rec, count in recommendation_counts.most_common(5)]
# Example usage
risk_assessor = AutomatedRiskAssessment()
# Sample resources to assess
sample_resources = [
{
'id': 'db-production-001',
'type': 'database',
'vulnerability_severity': 8.5,
'patch_age_days': 120,
'access_level': 4,
'data_classification': 'FINANCIAL',
'network_segment_risk': 2,
'threat_level': 3,
'compliance_gap_score': 2,
'control_effectiveness': 3,
'public_access': False,
'encryption_enabled': True,
'multi_factor_auth': False,
'logging_enabled': True
},
{
'id': 'api-public-001',
'type': 'application',
'vulnerability_severity': 7.2,
'patch_age_days': 45,
'access_level': 5,
'data_classification': 'PHI',
'network_segment_risk': 4,
'threat_level': 4,
'compliance_gap_score': 3,
'control_effectiveness': 2,
'public_access': True,
'encryption_enabled': False,
'multi_factor_auth': False,
'logging_enabled': False
}
]
# Run risk assessment
risk_report = risk_assessor.run_comprehensive_risk_assessment(sample_resources)
print(json.dumps(risk_report, indent=2))Change Management for Compliance
Automated Change Approval Workflow
# Example: Automated change approval workflow for compliance
name: Compliant Change Management
on:
pull_request:
types: [opened, synchronize, reopened]
jobs:
compliance-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
# Check if change affects compliance-critical components
- name: Identify compliance impact
id: compliance-check
run: |
# Check if changed files are in compliance-critical directories
COMPLIANCE_FILES_CHANGED=$(git diff --name-only HEAD^ HEAD | grep -E "(config|security|compliance|infrastructure)" | wc -l)
if [ $COMPLIANCE_FILES_CHANGED -gt 0 ]; then
echo "COMPLIANCE_IMPACT=true" >> $GITHUB_OUTPUT
echo "Change affects compliance-critical components"
# Get list of affected files
AFFECTED_FILES=$(git diff --name-only HEAD^ HEAD | grep -E "(config|security|compliance|infrastructure)")
echo "Affected files: $AFFECTED_FILES"
# Determine if additional approvals are needed
if echo "$AFFECTED_FILES" | grep -q "pci"; then
echo "PCI-DSS_APPROVAL_NEEDED=true" >> $GITHUB_OUTPUT
fi
if echo "$AFFECTED_FILES" | grep -q "hipaa"; then
echo "HIPAA_APPROVAL_NEEDED=true" >> $GITHUB_OUTPUT
fi
else
echo "COMPLIANCE_IMPACT=false" >> $GITHUB_OUTPUT
fi
# Run compliance validation if needed
- name: Run compliance validation
if: steps.compliance-check.outputs.COMPLIANCE_IMPACT == 'true'
run: |
# Run compliance validation scripts
./scripts/validate-compliance.sh
# Security scanning for compliance changes
- name: Security scan for compliance changes
if: steps.compliance-check.outputs.COMPLIANCE_IMPACT == 'true'
run: |
# Run enhanced security scanning
./scripts/enhanced-security-scan.sh
# Check for required approvals
- name: Check required approvals
if: steps.compliance-check.outputs.COMPLIANCE_IMPACT == 'true'
run: |
# Check if required approvals are present
APPROVALS_NEEDED=1
if [ "${{ steps.compliance-check.outputs.PCI_DSS_APPROVAL_NEEDED }}" = "true" ]; then
APPROVALS_NEEDED=$((APPROVALS_NEEDED + 1))
echo "PCI-DSS changes require additional approval"
fi
if [ "${{ steps.compliance-check.outputs.HIPAA_APPROVAL_NEEDED }}" = "true" ]; then
APPROVALS_NEEDED=$((APPROVALS_NEEDED + 1))
echo "HIPAA changes require additional approval"
fi
# Count actual approvals
ACTUAL_APPROVALS=$(curl -s -H "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \
"https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}/reviews" \
| jq -r '.[] | select(.state == "APPROVED") | .user.login' | wc -l)
if [ $ACTUAL_APPROVALS -lt $APPROVALS_NEEDED ]; then
echo "Insufficient approvals. Required: $APPROVALS_NEEDED, Actual: $ACTUAL_APPROVALS"
exit 1
fi
automated-testing:
needs: compliance-check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Run compliance-specific tests
- name: Run compliance tests
run: |
pytest tests/compliance/ -v
# Run integration tests for compliance features
- name: Run compliance integration tests
run: |
pytest tests/integration/test_compliance.py -v
approval-gate:
needs: [compliance-check, automated-testing]
runs-on: ubuntu-latest
steps:
- name: Final compliance check
run: |
# Final check before merge
echo "All compliance checks passed"
echo "Ready for merge with compliance approval"Audit Trail Management
Automated Audit Trail Collection
# Example: Automated audit trail management
import sqlite3
import json
from datetime import datetime
from typing import Dict, List, Any
import hashlib
import hmac
class AuditTrailManager:
def __init__(self, db_path: str = 'audit_trail.db'):
self.db_path = db_path
self.init_database()
self.secret_key = self.load_secret_key()
def init_database(self):
"""
Initialize audit trail database
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create audit trail table
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id TEXT UNIQUE NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
user_id TEXT NOT NULL,
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
old_values TEXT,
new_values TEXT,
ip_address TEXT,
user_agent TEXT,
session_id TEXT,
compliance_framework TEXT,
signature TEXT NOT NULL
)
''')
# Create indexes for performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON audit_events(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_id ON audit_events(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_resource_id ON audit_events(resource_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_compliance_framework ON audit_events(compliance_framework)')
conn.commit()
conn.close()
def load_secret_key(self) -> bytes:
"""
Load secret key for audit event signing
"""
# In practice, this would come from a secure configuration
return b'audit_trail_signing_key' # This should be loaded securely
def calculate_signature(self, event_data: Dict[str, Any]) -> str:
"""
Calculate HMAC signature for audit event
"""
# Create canonical representation of event data
canonical_data = json.dumps(event_data, sort_keys=True, default=str)
# Calculate HMAC signature
signature = hmac.new(
self.secret_key,
canonical_data.encode(),
hashlib.sha256
).hexdigest()
return signature
def log_audit_event(self, event: Dict[str, Any]) -> str:
"""
Log an audit event to the trail
"""
# Generate unique event ID
event_id = f"audit_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}_{hashlib.md5(str(event).encode()).hexdigest()[:8]}"
# Calculate signature for the event
signature = self.calculate_signature({
'event_id': event_id,
**event
})
# Prepare event for storage
audit_event = {
'event_id': event_id,
'user_id': event['user_id'],
'action': event['action'],
'resource_type': event['resource_type'],
'resource_id': event['resource_id'],
'old_values': json.dumps(event.get('old_values', {})),
'new_values': json.dumps(event.get('new_values', {})),
'ip_address': event.get('ip_address', ''),
'user_agent': event.get('user_agent', ''),
'session_id': event.get('session_id', ''),
'compliance_framework': event.get('compliance_framework', ''),
'signature': signature
}
# Insert into database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO audit_events
(event_id, user_id, action, resource_type, resource_id, old_values, new_values,
ip_address, user_agent, session_id, compliance_framework, signature)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', tuple(audit_event.values()))
conn.commit()
conn.close()
return event_id
def get_audit_events(self, filters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
Retrieve audit events with optional filtering
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM audit_events WHERE 1=1"
params = []
if filters:
if 'user_id' in filters:
query += " AND user_id = ?"
params.append(filters['user_id'])
if 'resource_id' in filters:
query += " AND resource_id = ?"
params.append(filters['resource_id'])
if 'action' in filters:
query += " AND action = ?"
params.append(filters['action'])
if 'compliance_framework' in filters:
query += " AND compliance_framework = ?"
params.append(filters['compliance_framework'])
if 'start_date' in filters:
query += " AND timestamp >= ?"
params.append(filters['start_date'])
if 'end_date' in filters:
query += " AND timestamp <= ?"
params.append(filters['end_date'])
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
# Get column names
columns = [description[0] for description in cursor.description]
# Convert rows to dictionaries
events = []
for row in rows:
event_dict = dict(zip(columns, row))
# Parse JSON fields
event_dict['old_values'] = json.loads(event_dict['old_values'])
event_dict['new_values'] = json.loads(event_dict['new_values'])
events.append(event_dict)
conn.close()
return events
def verify_audit_integrity(self, event_id: str) -> bool:
"""
Verify the integrity of a specific audit event
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM audit_events WHERE event_id = ?", (event_id,))
row = cursor.fetchone()
if not row:
conn.close()
return False
# Get column names
columns = [description[0] for description in cursor.description]
event = dict(zip(columns, row))
conn.close()
# Recreate the data that was signed
original_data = {
'event_id': event['event_id'],
'user_id': event['user_id'],
'action': event['action'],
'resource_type': event['resource_type'],
'resource_id': event['resource_id'],
'old_values': event['old_values'],
'new_values': event['new_values'],
'ip_address': event['ip_address'],
'user_agent': event['user_agent'],
'session_id': event['session_id'],
'compliance_framework': event['compliance_framework']
}
# Calculate signature for verification
calculated_signature = self.calculate_signature(original_data)
return hmac.compare_digest(calculated_signature, event['signature'])
def generate_audit_report(self, start_date: str, end_date: str, compliance_framework: str = None) -> Dict[str, Any]:
"""
Generate comprehensive audit report
"""
filters = {
'start_date': start_date,
'end_date': end_date
}
if compliance_framework:
filters['compliance_framework'] = compliance_framework
events = self.get_audit_events(filters)
# Generate report statistics
report = {
'report_metadata': {
'generated_at': datetime.utcnow().isoformat(),
'period_start': start_date,
'period_end': end_date,
'compliance_framework': compliance_framework
},
'summary': {
'total_events': len(events),
'unique_users': len(set(event['user_id'] for event in events)),
'unique_resources': len(set(f"{event['resource_type']}:{event['resource_id']}" for event in events)),
'actions_by_type': {}
},
'events': events,
'compliance_verification': {
'integrity_check_passed': all(self.verify_audit_integrity(event['event_id']) for event in events)
}
}
# Count actions by type
for event in events:
action = event['action']
report['summary']['actions_by_type'][action] = report['summary']['actions_by_type'].get(action, 0) + 1
return report
# Example usage
audit_manager = AuditTrailManager()
# Log some sample audit events
event1_id = audit_manager.log_audit_event({
'user_id': 'user123',
'action': 'UPDATE',
'resource_type': 'Database',
'resource_id': 'db-prod-001',
'old_values': {'encryption': False},
'new_values': {'encryption': True},
'ip_address': '192.168.1.100',
'user_agent': 'Mozilla/5.0...',
'session_id': 'sess_abc123',
'compliance_framework': 'SOX'
})
event2_id = audit_manager.log_audit_event({
'user_id': 'admin456',
'action': 'DELETE',
'resource_type': 'PatientRecord',
'resource_id': 'pat_rec_789',
'old_values': {'status': 'active'},
'new_values': {'status': 'deleted'},
'ip_address': '192.168.1.101',
'user_agent': 'Mozilla/5.0...',
'session_id': 'sess_def456',
'compliance_framework': 'HIPAA'
})
# Generate audit report
report = audit_manager.generate_audit_report(
start_date='2023-01-01',
end_date='2023-12-31',
compliance_framework='SOX'
)
print(json.dumps(report, indent=2))Compliance Reporting and Documentation
Automated Compliance Reporting
Compliance Dashboard Implementation
# Example: Compliance dashboard backend
from flask import Flask, jsonify, request
import plotly.graph_objects as go
import plotly.express as px
from plotly.utils import PlotlyJSONEncoder
import json
from datetime import datetime, timedelta
import calendar
app = Flask(__name__)
class ComplianceDashboard:
def __init__(self):
self.data_source = self.initialize_data_source()
def initialize_data_source(self):
"""
Initialize connection to compliance data sources
"""
# This would connect to your compliance databases/data warehouses
return {
'compliance_db': self.connect_to_compliance_db(),
'security_scans': self.connect_to_security_scans(),
'audit_logs': self.connect_to_audit_logs()
}
def connect_to_compliance_db(self):
"""
Connect to compliance database
"""
# Placeholder for actual database connection
return MockComplianceDB()
def connect_to_security_scans(self):
"""
Connect to security scan results
"""
return MockSecurityScans()
def connect_to_audit_logs(self):
"""
Connect to audit logs
"""
return MockAuditLogs()
def get_compliance_summary(self) -> Dict[str, Any]:
"""
Get overall compliance summary
"""
# Get compliance data
compliance_data = self.data_source['compliance_db'].get_current_compliance()
summary = {
'overall_compliance': compliance_data['overall_compliance'],
'frameworks': compliance_data['frameworks'],
'trending': compliance_data['trending'],
'recent_violations': compliance_data['recent_violations'][:5],
'upcoming_audits': compliance_data['upcoming_audits']
}
return summary
def get_compliance_trend_chart(self, days: int = 90) -> str:
"""
Generate compliance trend chart
"""
# Get historical compliance data
historical_data = self.data_source['compliance_db'].get_historical_compliance(days)
# Create trend chart
fig = go.Figure()
for framework, data in historical_data.items():
fig.add_trace(go.Scatter(
x=[datetime.fromisoformat(d['date']) for d in data],
y=[d['compliance_percentage'] for d in data],
mode='lines+markers',
name=framework,
hovertemplate='%{y:.2f}%<extra></extra>'
))
fig.update_layout(
title='Compliance Trend Over Time',
xaxis_title='Date',
yaxis_title='Compliance Percentage',
height=400
)
return json.dumps(fig, cls=PlotlyJSONEncoder)
def get_violation_analysis_chart(self) -> str:
"""
Generate violation analysis chart
"""
violations = self.data_source['compliance_db'].get_violation_analysis()
fig = px.bar(
x=list(violations.keys()),
y=list(violations.values()),
labels={'x': 'Violation Type', 'y': 'Count'},
title='Compliance Violations by Type'
)
fig.update_layout(height=400)
return json.dumps(fig, cls=PlotlyJSONEncoder)
def get_resource_compliance_chart(self) -> str:
"""
Generate resource compliance chart
"""
resource_compliance = self.data_source['compliance_db'].get_resource_compliance()
fig = go.Figure(data=[
go.Bar(name='Compliant', x=list(resource_compliance.keys()),
y=[v['compliant'] for v in resource_compliance.values()]),
go.Bar(name='Non-Compliant', x=list(resource_compliance.keys()),
y=[v['non_compliant'] for v in resource_compliance.values()])
])
fig.update_layout(
barmode='stack',
title='Resource Compliance by Type',
height=400
)
return json.dumps(fig, cls=PlotlyJSONEncoder)
class MockComplianceDB:
"""
Mock compliance database for demonstration
"""
def get_current_compliance(self):
return {
'overall_compliance': 94.2,
'frameworks': {
'SOX': 96.5,
'HIPAA': 92.1,
'PCI-DSS': 95.8,
'GDPR': 93.7
},
'trending': {
'last_month': 91.8,
'current': 94.2,
'trend': 'up'
},
'recent_violations': [
{'id': 'VIOL-001', 'type': 'Missing encryption', 'severity': 'High', 'timestamp': '2023-12-01T10:30:00Z'},
{'id': 'VIOL-002', 'type': 'Unrestricted access', 'severity': 'Medium', 'timestamp': '2023-12-02T14:15:00Z'},
{'id': 'VIOL-003', 'type': 'Missing audit log', 'severity': 'Low', 'timestamp': '2023-12-03T09:45:00Z'}
],
'upcoming_audits': [
{'framework': 'SOX', 'date': '2024-01-15', 'type': 'Quarterly'},
{'framework': 'HIPAA', 'date': '2024-02-20', 'type': 'Annual'}
]
}
def get_historical_compliance(self, days):
import random
dates = [(datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') for i in range(days, 0, -1)]
return {
'SOX': [{'date': date, 'compliance_percentage': round(random.uniform(90, 98), 2)} for date in dates],
'HIPAA': [{'date': date, 'compliance_percentage': round(random.uniform(88, 96), 2)} for date in dates],
'PCI-DSS': [{'date': date, 'compliance_percentage': round(random.uniform(92, 99), 2)} for date in dates]
}
def get_violation_analysis(self):
return {
'Missing Encryption': 15,
'Unrestricted Access': 8,
'Missing Audit Logs': 12,
'Outdated Software': 23,
'Weak Authentication': 7
}
def get_resource_compliance(self):
return {
'Databases': {'compliant': 45, 'non_compliant': 3},
'Applications': {'compliant': 67, 'non_compliant': 8},
'Network Devices': {'compliant': 23, 'non_compliant': 2},
'Storage Systems': {'compliant': 34, 'non_compliant': 1}
}
class MockSecurityScans:
"""
Mock security scan results
"""
pass
class MockAuditLogs:
"""
Mock audit logs
"""
pass
dashboard = ComplianceDashboard()
@app.route('/api/compliance/summary')
def compliance_summary():
"""
Get compliance summary
"""
summary = dashboard.get_compliance_summary()
return jsonify(summary)
@app.route('/api/compliance/trend-chart')
def compliance_trend_chart():
"""
Get compliance trend chart
"""
days = int(request.args.get('days', 90))
chart_json = dashboard.get_compliance_trend_chart(days)
return jsonify(json.loads(chart_json))
@app.route('/api/compliance/violation-chart')
def violation_analysis_chart():
"""
Get violation analysis chart
"""
chart_json = dashboard.get_violation_analysis_chart()
return jsonify(json.loads(chart_json))
@app.route('/api/compliance/resource-chart')
def resource_compliance_chart():
"""
Get resource compliance chart
"""
chart_json = dashboard.get_resource_compliance_chart()
return jsonify(json.loads(chart_json))
@app.route('/api/compliance/full-report')
def full_compliance_report():
"""
Get full compliance report
"""
summary = dashboard.get_compliance_summary()
trend_chart = json.loads(dashboard.get_compliance_trend_chart())
violation_chart = json.loads(dashboard.get_violation_analysis_chart())
resource_chart = json.loads(dashboard.get_resource_compliance_chart())
report = {
'summary': summary,
'charts': {
'trend': trend_chart,
'violations': violation_chart,
'resources': resource_chart
},
'generated_at': datetime.utcnow().isoformat()
}
return jsonify(report)
if __name__ == '__main__':
app.run(debug=True, port=5001)Conclusion
DevSecOps compliance and governance require a fundamental shift from traditional compliance models to continuous, automated approaches. The key to success lies in implementing robust automation frameworks that can monitor, verify, and report compliance status in real-time while preserving the agility that DevSecOps provides.
Effective compliance in DevSecOps environments combines:
- Automated evidence collection and storage
- Real-time monitoring and alerting
- Continuous risk assessment
- Comprehensive audit trails
- Integration with development and deployment pipelines
Organizations that successfully implement DevSecOps compliance and governance will achieve better security postures, more efficient compliance processes, and greater agility in responding to changing regulatory requirements.
In the next article, we'll explore DevSecOps incident response and threat management, examining how to detect, respond to, and recover from security incidents in DevSecOps environments.