DevSecOps Security Testing and Assurance
Overview
DevSecOps security testing and assurance integrate security validation throughout the software development lifecycle, ensuring that security is not an afterthought but an integral part of the development process. This article explores how to implement comprehensive security testing strategies that provide continuous security assurance while maintaining development velocity.
Security Testing in the Development Lifecycle
Shift-Left Security Testing
Early Integration of Security Tests
Security testing begins at the earliest stages of development, moving security validation left in the development pipeline:
PYTHON
# Example: Security testing framework integration
import unittest
from unittest.mock import patch, MagicMock
import requests
from typing import Dict, List, Any
import json
from datetime import datetime
class SecurityTestFramework:
"""
Framework for integrating security tests throughout the development lifecycle
"""
def __init__(self):
self.test_categories = {
'static_analysis': [],
'dynamic_analysis': [],
'dependency_scanning': [],
'configuration_testing': [],
'runtime_security': []
}
self.test_results = []
self.vulnerability_db = self.load_vulnerability_database()
def load_vulnerability_database(self) -> Dict[str, Any]:
"""
Load known vulnerabilities database
"""
return {
'cwe': {}, # Common Weakness Enumeration
'cve': {}, # Common Vulnerabilities and Exposures
'owasp_top_10': {} # OWASP Top 10 vulnerabilities
}
def register_static_analysis_test(self, test_func, severity='medium'):
"""
Register a static analysis security test
"""
self.test_categories['static_analysis'].append({
'func': test_func,
'severity': severity,
'registered_at': datetime.utcnow()
})
def register_dynamic_analysis_test(self, test_func, severity='high'):
"""
Register a dynamic analysis security test
"""
self.test_categories['dynamic_analysis'].append({
'func': test_func,
'severity': severity,
'registered_at': datetime.utcnow()
})
def run_static_analysis_tests(self, code_files: List[str]) -> List[Dict[str, Any]]:
"""
Run all registered static analysis tests
"""
results = []
for test_config in self.test_categories['static_analysis']:
for file_path in code_files:
try:
result = test_config['func'](file_path)
if result: # If test found issues
results.append({
'test_name': test_config['func'].__name__,
'file_path': file_path,
'severity': test_config['severity'],
'findings': result,
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
results.append({
'test_name': test_config['func'].__name__,
'file_path': file_path,
'severity': 'error',
'findings': f'Test execution error: {str(e)}',
'timestamp': datetime.utcnow().isoformat()
})
return results
def run_dynamic_analysis_tests(self, target_url: str) -> List[Dict[str, Any]]:
"""
Run all registered dynamic analysis tests
"""
results = []
for test_config in self.test_categories['dynamic_analysis']:
try:
result = test_config['func'](target_url)
if result: # If test found issues
results.append({
'test_name': test_config['func'].__name__,
'target_url': target_url,
'severity': test_config['severity'],
'findings': result,
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
results.append({
'test_name': test_config['func'].__name__,
'target_url': target_url,
'severity': 'error',
'findings': f'Test execution error: {str(e)}',
'timestamp': datetime.utcnow().isoformat()
})
return results
# Example security test functions
def test_sql_injection_static(file_path: str) -> List[Dict[str, Any]]:
"""
Static analysis test for SQL injection vulnerabilities
"""
findings = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
# Look for common SQL injection patterns
if any(pattern in line.lower() for pattern in ['select * from', 'insert into', 'update ', 'delete from']):
if any(dangerous in line for dangerous in ['+ "', '+ variable', '+ input']):
findings.append({
'line_number': i,
'code_snippet': line.strip(),
'vulnerability_type': 'SQL Injection',
'severity': 'high'
})
return findings
def test_xss_static(file_path: str) -> List[Dict[str, Any]]:
"""
Static analysis test for XSS vulnerabilities
"""
findings = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
# Look for common XSS patterns
if 'innerHTML' in line or 'document.write' in line:
findings.append({
'line_number': i,
'code_snippet': line.strip(),
'vulnerability_type': 'Cross-Site Scripting',
'severity': 'high'
})
if 'eval(' in line or 'Function(' in line:
findings.append({
'line_number': i,
'code_snippet': line.strip(),
'vulnerability_type': 'Code Injection',
'severity': 'critical'
})
return findings
def test_authentication_bypass(target_url: str) -> List[Dict[str, Any]]:
"""
Dynamic test for authentication bypass
"""
findings = []
# Test with common authentication bypass techniques
bypass_attempts = [
{'Cookie': 'auth=admin; role=admin'},
{'Authorization': 'Bearer invalid_token'},
{'X-Forwarded-For': '127.0.0.1'}
]
for headers in bypass_attempts:
try:
response = requests.get(target_url, headers=headers, timeout=10)
if response.status_code not in [401, 403]: # Not unauthorized/forbidden
findings.append({
'test_type': 'Authentication Bypass',
'headers_used': headers,
'response_status': response.status_code,
'severity': 'critical'
})
except requests.RequestException:
continue
return findings
def test_sql_injection_dynamic(target_url: str) -> List[Dict[str, Any]]:
"""
Dynamic test for SQL injection vulnerabilities
"""
findings = []
# Common SQL injection payloads
sql_payloads = [
"' OR '1'='1",
"' UNION SELECT * FROM users--",
"'; DROP TABLE users--",
"admin'--",
"1' AND 1=1--"
]
# Test parameters commonly vulnerable to SQL injection
test_params = ['id', 'user', 'email', 'search', 'filter']
for param in test_params:
for payload in sql_payloads:
try:
# Test GET request
test_url = f"{target_url}?{param}={payload}"
response = requests.get(test_url, timeout=10)
# Check for SQL error indicators
error_indicators = [
'sql syntax', 'mysql', 'postgresql', 'sqlite', 'oracle',
'error', 'exception', 'traceback', 'at line'
]
response_text = response.text.lower()
if any(indicator in response_text for indicator in error_indicators):
findings.append({
'test_type': 'SQL Injection',
'parameter': param,
'payload_used': payload,
'response_status': response.status_code,
'error_indicators_found': [indicator for indicator in error_indicators if indicator in response_text],
'severity': 'high'
})
break # Found vulnerability, move to next parameter
except requests.RequestException:
continue
return findings
# Initialize the security testing framework
security_framework = SecurityTestFramework()
# Register security tests
security_framework.register_static_analysis_test(test_sql_injection_static, 'high')
security_framework.register_static_analysis_test(test_xss_static, 'high')
security_framework.register_dynamic_analysis_test(test_authentication_bypass, 'critical')
security_framework.register_dynamic_analysis_test(test_sql_injection_dynamic, 'high')
# Example usage in development workflow
def run_security_tests_in_ci_cd():
"""
Example of running security tests in CI/CD pipeline
"""
print("Running security tests...")
# Run static analysis on code files
code_files = ['app/main.py', 'app/auth.py', 'app/models.py']
static_results = security_framework.run_static_analysis_tests(code_files)
print(f"Static analysis found {len(static_results)} issues")
# Run dynamic analysis on test environment
target_url = "http://test-env.company.com/api/users"
dynamic_results = security_framework.run_dynamic_analysis_tests(target_url)
print(f"Dynamic analysis found {len(dynamic_results)} issues")
# Aggregate results
all_results = static_results + dynamic_results
# Check if critical issues exist
critical_issues = [r for r in all_results if r['severity'] in ['critical', 'error']]
if critical_issues:
print(f"Critical security issues found: {len(critical_issues)}")
print("Blocking deployment due to critical security issues")
return False
else:
print("No critical security issues found. Proceeding with deployment.")
return True
# Run the example
# run_security_tests_in_ci_cd()Pre-Commit Security Hooks
Implementing security checks before code commits:
BASH
#!/bin/bash
# .git/hooks/pre-commit - Security pre-commit hook
echo "Running pre-commit security checks..."
# Check for secrets in committed code
if git diff --cached --name-only | xargs grep -l -i -E "(password|secret|key|token|private)" --include="*.py,*.js,*.java,*.json,*.yaml,*.yml,*.xml,*.env" 2>/dev/null; then
echo "ERROR: Potential secrets detected in committed code!"
echo "Please remove sensitive information before committing."
exit 1
fi
# Run static analysis on changed files
CHANGED_PYTHON_FILES=$(git diff --cached --name-only --diff-filter=ACMR | grep "\.py$" | tr '\n' ' ')
if [ -n "$CHANGED_PYTHON_FILES" ]; then
echo "Running Python security analysis on: $CHANGED_PYTHON_FILES"
# Run bandit security analyzer
if command -v bandit &> /dev/null; then
bandit -r $CHANGED_PYTHON_FILES
BANDIT_EXIT_CODE=$?
if [ $BANDIT_EXIT_CODE -ne 0 ]; then
echo "Bandit security scan failed. Please address the issues."
exit 1
fi
fi
fi
# Check for common security issues in JavaScript
CHANGED_JS_FILES=$(git diff --cached --name-only --diff-filter=ACMR | grep "\.js$" | tr '\n' ' ')
if [ -n "$CHANGED_JS_FILES" ]; then
echo "Running JavaScript security analysis on: $CHANGED_JS_FILES"
# Run eslint with security rules
if command -v eslint &> /dev/null; then
eslint --quiet --fix $CHANGED_JS_FILES
ESLINT_EXIT_CODE=$?
if [ $ESLINT_EXIT_CODE -ne 0 ]; then
echo "ESLint security scan failed. Please address the issues."
exit 1
fi
fi
fi
# Check for dependency vulnerabilities
if [ -f "requirements.txt" ] || [ -f "package-lock.json" ] || [ -f "Gemfile.lock" ]; then
echo "Checking for vulnerable dependencies..."
# For Python projects
if [ -f "requirements.txt" ]; then
if command -v pip-audit &> /dev/null; then
pip-audit -r requirements.txt
PIP_AUDIT_EXIT_CODE=$?
if [ $PIP_AUDIT_EXIT_CODE -ne 0 ]; then
echo "Dependency vulnerability scan failed. Please update vulnerable packages."
exit 1
fi
fi
fi
# For Node.js projects
if [ -f "package-lock.json" ]; then
if command -v npm &> /dev/null; then
npm audit --audit-level high
NPM_AUDIT_EXIT_CODE=$?
if [ $NPM_AUDIT_EXIT_CODE -ne 0 ]; then
echo "NPM dependency audit failed. Please address vulnerabilities."
exit 1
fi
fi
fi
fi
echo "All security checks passed. Commit approved."
exit 0Continuous Security Testing Pipeline
Security Testing as Code
Implementing security tests as part of the application codebase:
PYTHON
# security_tests/application_security_tests.py
import unittest
import requests
from unittest.mock import patch, MagicMock
import json
from typing import Dict, Any
class ApplicationSecurityTests(unittest.TestCase):
"""
Security tests for the application
"""
def setUp(self):
"""
Set up test environment
"""
self.base_url = "http://test-app.company.com"
self.session = requests.Session()
def test_input_validation(self):
"""
Test input validation for common attack vectors
"""
# Test SQL injection
malicious_inputs = [
"' OR '1'='1",
"'; DROP TABLE users--",
"admin'--",
"<script>alert('xss')</script>"
]
for malicious_input in malicious_inputs:
with self.subTest(input=malicious_input):
# Test in different contexts
response = self.session.post(
f"{self.base_url}/api/search",
json={'query': malicious_input}
)
# Should not return 5xx errors or expose internal details
self.assertNotEqual(response.status_code, 500)
# Should not contain database error messages
self.assertNotIn('sql', response.text.lower())
self.assertNotIn('database', response.text.lower())
def test_authentication_required_endpoints(self):
"""
Test that protected endpoints require authentication
"""
protected_endpoints = [
'/api/admin/users',
'/api/admin/config',
'/api/user/profile',
'/api/user/settings'
]
for endpoint in protected_endpoints:
with self.subTest(endpoint=endpoint):
# Try to access without authentication
response = self.session.get(f"{self.base_url}{endpoint}")
# Should return 401 or 403
self.assertIn(response.status_code, [401, 403])
def test_authorization_boundaries(self):
"""
Test that users cannot access resources they shouldn't
"""
# Test user trying to access another user's data
response = self.session.get(
f"{self.base_url}/api/user/other-user-id/profile",
headers={'Authorization': 'Bearer valid-user-token'}
)
# Should return 403 Forbidden or similar
self.assertEqual(response.status_code, 403)
def test_csrf_protection(self):
"""
Test CSRF protection mechanisms
"""
# Try to make state-changing request without CSRF token
response = self.session.post(
f"{self.base_url}/api/user/profile",
data={'name': 'New Name'},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
# Should be rejected
self.assertIn(response.status_code, [400, 401, 403])
def test_rate_limiting(self):
"""
Test rate limiting protections
"""
# Make multiple requests rapidly
for i in range(100):
response = self.session.get(f"{self.base_url}/api/public/endpoint")
if response.status_code == 429: # Too Many Requests
# Rate limiting is working
break
# If we didn't hit the rate limit, it might not be properly implemented
self.assertEqual(response.status_code, 429, "Rate limiting not properly implemented")
def test_secure_headers(self):
"""
Test that security headers are properly set
"""
response = self.session.get(f"{self.base_url}/")
# Check for important security headers
security_headers = [
'X-Frame-Options',
'X-Content-Type-Options',
'X-XSS-Protection',
'Strict-Transport-Security'
]
for header in security_headers:
self.assertIn(header, response.headers, f"Missing security header: {header}")
def test_content_security_policy(self):
"""
Test Content Security Policy implementation
"""
response = self.session.get(f"{self.base_url}/")
csp_header = response.headers.get('Content-Security-Policy', '')
# Should have CSP header
self.assertTrue(csp_header, "Content-Security-Policy header not found")
# Should not allow 'unsafe-inline' or 'unsafe-eval' by default
self.assertNotIn("'unsafe-inline'", csp_header.lower())
self.assertNotIn("'unsafe-eval'", csp_header.lower())
# security_tests/infrastructure_security_tests.py
import boto3
import unittest
from unittest.mock import patch, MagicMock
from moto import mock_ec2, mock_s3, mock_iam
class InfrastructureSecurityTests(unittest.TestCase):
"""
Security tests for infrastructure components
"""
@mock_ec2
def test_security_group_restrictions(self):
"""
Test that security groups are properly restrictive
"""
ec2 = boto3.resource('ec2', region_name='us-east-1')
# Create a test security group
sg = ec2.create_security_group(
GroupName='test-sg',
Description='Test security group for security testing'
)
# Test that the security group doesn't allow all traffic by default
# This would be a security misconfiguration
for permission in sg.ip_permissions:
# Check if it allows all IPs (0.0.0.0/0) for sensitive ports
for ip_range in permission.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
# For sensitive ports, this should not be allowed
sensitive_ports = [22, 3389, 80, 443] # SSH, RDP, HTTP, HTTPS
if permission.get('FromPort') in sensitive_ports:
self.fail(f"Security group allows all IPs on sensitive port {permission.get('FromPort')}")
@mock_s3
def test_s3_bucket_security(self):
"""
Test S3 bucket security configurations
"""
s3 = boto3.client('s3', region_name='us-east-1')
# Create a test bucket
bucket_name = 'test-security-bucket'
s3.create_bucket(Bucket=bucket_name)
# Check that public access is blocked
public_access_block = s3.get_public_access_block(Bucket=bucket_name)
block_config = public_access_block['PublicAccessBlockConfiguration']
self.assertTrue(block_config['BlockPublicAcls'])
self.assertTrue(block_config['BlockPublicPolicy'])
self.assertTrue(block_config['IgnorePublicAcls'])
self.assertTrue(block_config['RestrictPublicBuckets'])
@mock_iam
def test_iam_policy_restrictions(self):
"""
Test IAM policy security
"""
iam = boto3.client('iam', region_name='us-east-1')
# Create a test user
user_name = 'test-user'
iam.create_user(UserName=user_name)
# Attach a test policy
policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "*"
}
]
}
policy_name = 'test-policy'
policy_arn = iam.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_doc)
)['Policy']['Arn']
iam.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
# Test that overly permissive policies are flagged
attached_policies = iam.list_attached_user_policies(UserName=user_name)
for policy in attached_policies['AttachedPolicies']:
policy_doc = iam.get_policy(PolicyArn=policy['PolicyArn'])
# In a real test, we'd analyze the policy document for security issues
# For this example, we'll just ensure the policy exists
self.assertIsNotNone(policy_doc)
# Run security tests as part of CI/CD
def run_security_test_suite():
"""
Run the complete security test suite
"""
# Create test suite
loader = unittest.TestLoader()
# Load application security tests
app_suite = loader.loadTestsFromTestCase(ApplicationSecurityTests)
# Load infrastructure security tests
infra_suite = loader.loadTestsFromTestCase(InfrastructureSecurityTests)
# Combine suites
full_suite = unittest.TestSuite([app_suite, infra_suite])
# Run tests
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(full_suite)
# Return success/failure
return result.wasSuccessful()
if __name__ == '__main__':
success = run_security_test_suite()
exit(0 if success else 1)Security Testing Types and Techniques
Static Application Security Testing (SAST)
SAST Implementation and Best Practices
PYTHON
# Example: Custom SAST scanner implementation
import ast
import re
from typing import List, Dict, Any
from pathlib import Path
import json
class CustomSASTScanner:
"""
Custom Static Application Security Testing scanner
"""
def __init__(self):
self.rules = self.load_security_rules()
self.findings = []
def load_security_rules(self) -> Dict[str, Any]:
"""
Load security rules for scanning
"""
return {
'sql_injection': {
'patterns': [
r'(cursor\.execute|\.query|\.find)\([^)]*[\+\%]\s*[\'"]',
r'(select|insert|update|delete)\s+.*\s+where\s+.*\s*[=\+\%]',
],
'severity': 'high',
'description': 'Potential SQL injection vulnerability'
},
'xss': {
'patterns': [
r'document\.write\(',
r'innerHTML\s*=',
r'outerHTML\s*=',
r'eval\(',
r'Function\('
],
'severity': 'high',
'description': 'Potential Cross-Site Scripting vulnerability'
},
'hardcoded_credentials': {
'patterns': [
r'(password|secret|key|token|auth)[\s=:]+["\'][^"\']+["\']',
r'(os\.environ\.get|os\.getenv)\([^)]*(password|secret|key|token|auth)',
],
'severity': 'critical',
'description': 'Hardcoded credentials detected'
},
'path_traversal': {
'patterns': [
r'os\.path\.join\([^)]*\+\s*[\'"][\.\.\/]',
r'open\([^)]*[\+\%]\s*[\'"][\.\.\/]',
],
'severity': 'high',
'description': 'Potential path traversal vulnerability'
}
}
def scan_python_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
Scan a Python file for security vulnerabilities
"""
findings = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
# Parse the AST to understand code structure
try:
tree = ast.parse(content)
except SyntaxError:
# If syntax error, still scan for patterns
tree = None
# Check each rule
for rule_name, rule_config in self.rules.items():
for pattern in rule_config['patterns']:
compiled_pattern = re.compile(pattern, re.IGNORECASE)
for line_num, line in enumerate(lines, 1):
matches = compiled_pattern.finditer(line)
for match in matches:
findings.append({
'rule': rule_name,
'severity': rule_config['severity'],
'description': rule_config['description'],
'file_path': file_path,
'line_number': line_num,
'column_start': match.start(),
'column_end': match.end(),
'code_snippet': line.strip(),
'pattern_matched': match.group(0),
'timestamp': '2023-12-01T10:00:00Z'
})
# Additional AST-based analysis
if tree:
findings.extend(self.analyze_ast(tree, file_path, lines))
except Exception as e:
findings.append({
'rule': 'scan_error',
'severity': 'error',
'description': f'Scan error: {str(e)}',
'file_path': file_path,
'line_number': 0,
'code_snippet': '',
'timestamp': '2023-12-01T10:00:00Z'
})
return findings
def analyze_ast(self, tree: ast.AST, file_path: str, lines: List[str]) -> List[Dict[str, Any]]:
"""
Analyze Python AST for security issues
"""
findings = []
for node in ast.walk(tree):
# Check for eval usage
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'eval':
line_num = node.lineno
findings.append({
'rule': 'dangerous_eval',
'severity': 'critical',
'description': 'Use of eval() function is dangerous',
'file_path': file_path,
'line_number': line_num,
'column_start': getattr(node, 'col_offset', 0),
'code_snippet': lines[line_num - 1].strip() if line_num <= len(lines) else '',
'timestamp': '2023-12-01T10:00:00Z'
})
# Check for exec usage
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec':
line_num = node.lineno
findings.append({
'rule': 'dangerous_exec',
'severity': 'critical',
'description': 'Use of exec() function is dangerous',
'file_path': file_path,
'line_number': line_num,
'column_start': getattr(node, 'col_offset', 0),
'code_snippet': lines[line_num - 1].strip() if line_num <= len(lines) else '',
'timestamp': '2023-12-01T10:00:00Z'
})
# Check for dangerous imports
if isinstance(node, ast.ImportFrom) and node.module == 'os':
for alias in node.names:
if alias.name == 'system':
line_num = node.lineno
findings.append({
'rule': 'dangerous_import',
'severity': 'high',
'description': 'Import of dangerous os.system() function',
'file_path': file_path,
'line_number': line_num,
'column_start': getattr(node, 'col_offset', 0),
'code_snippet': lines[line_num - 1].strip() if line_num <= len(lines) else '',
'timestamp': '2023-12-01T10:00:00Z'
})
return findings
def scan_directory(self, directory_path: str) -> List[Dict[str, Any]]:
"""
Scan an entire directory for security issues
"""
all_findings = []
path = Path(directory_path)
# Find all Python files
python_files = list(path.rglob("*.py"))
for py_file in python_files:
file_findings = self.scan_python_file(str(py_file))
all_findings.extend(file_findings)
return all_findings
def generate_report(self, findings: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate a security report from findings
"""
report = {
'scan_timestamp': '2023-12-01T10:00:00Z',
'total_findings': len(findings),
'findings_by_severity': {
'critical': len([f for f in findings if f['severity'] == 'critical']),
'high': len([f for f in findings if f['severity'] == 'high']),
'medium': len([f for f in findings if f['severity'] == 'medium']),
'low': len([f for f in findings if f['severity'] == 'low']),
'error': len([f for f in findings if f['severity'] == 'error'])
},
'findings_by_rule': {},
'detailed_findings': findings
}
# Count findings by rule
for finding in findings:
rule = finding['rule']
if rule not in report['findings_by_rule']:
report['findings_by_rule'][rule] = 0
report['findings_by_rule'][rule] += 1
return report
# Example usage of the SAST scanner
def run_custom_sast_scan():
"""
Example of running custom SAST scan
"""
scanner = CustomSASTScanner()
# Scan a directory
findings = scanner.scan_directory("./src/")
# Generate report
report = scanner.generate_report(findings)
# Print summary
print(f"Total findings: {report['total_findings']}")
print(f"Critical: {report['findings_by_severity']['critical']}")
print(f"High: {report['findings_by_severity']['high']}")
print(f"Medium: {report['findings_by_severity']['medium']}")
print(f"Low: {report['findings_by_severity']['low']}")
# Output detailed report
with open('sast-report.json', 'w') as f:
json.dump(report, f, indent=2)
return report
# Example vulnerable code to test the scanner
vulnerable_code_example = '''
import os
import sqlite3
def get_user_data(user_id):
conn = sqlite3.connect("users.db")
cursor = conn.cursor()
# Vulnerable to SQL injection
query = "SELECT * FROM users WHERE id = " + user_id
cursor.execute(query)
return cursor.fetchall()
def display_message(message):
# Vulnerable to XSS
html = "<div>" + message + "</div>"
document.write(html)
def dangerous_function(user_input):
# Dangerous eval usage
result = eval(user_input)
return result
# Hardcoded credentials
API_KEY = "sk-very-secret-key-do-not-commit"
def path_traversal(filename):
# Vulnerable to path traversal
filepath = os.path.join("/safe/directory/", filename)
with open(filepath, "r") as f:
return f.read()
'''
# Save example to test file
with open('test_vulnerable.py', 'w') as f:
f.write(vulnerable_code_example)
# Run the scanner on the test file
scanner = CustomSASTScanner()
findings = scanner.scan_python_file('test_vulnerable.py')
print(f"Findings in test file: {len(findings)}")
for finding in findings:
print(f"Rule: {finding['rule']}, Severity: {finding['severity']}, Line: {finding['line_number']}")Dynamic Application Security Testing (DAST)
DAST Implementation and Automation
PYTHON
# Example: DAST scanner implementation
import requests
import urllib3
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re
from typing import List, Dict, Any, Set
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DASTScanner:
"""
Dynamic Application Security Testing scanner
"""
def __init__(self, base_url: str, max_threads: int = 5):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'DevSecOps-DAST-Scanner/1.0',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9'
})
self.max_threads = max_threads
self.visited_urls: Set[str] = set()
self.forms_found: List[Dict[str, Any]] = []
self.links_found: List[str] = []
self.security_issues: List[Dict[str, Any]] = []
def crawl_application(self, max_pages: int = 100) -> List[str]:
"""
Crawl the application to discover URLs
"""
urls_to_visit = [self.base_url]
discovered_urls = {self.base_url}
while urls_to_visit and len(discovered_urls) < max_pages:
current_url = urls_to_visit.pop(0)
if current_url in self.visited_urls:
continue
try:
response = self.session.get(current_url, timeout=10, verify=False)
self.visited_urls.add(current_url)
soup = BeautifulSoup(response.content, 'html.parser')
# Find all links
for link in soup.find_all('a', href=True):
absolute_url = urljoin(current_url, link['href'])
parsed_url = urlparse(absolute_url)
# Only follow links within the same domain
if parsed_url.netloc == urlparse(self.base_url).netloc:
if absolute_url not in discovered_urls:
discovered_urls.add(absolute_url)
urls_to_visit.append(absolute_url)
# Find all forms
for form in soup.find_all('form'):
form_action = form.get('action', '')
if form_action:
form_url = urljoin(current_url, form_action)
self.forms_found.append({
'url': form_url,
'method': form.get('method', 'GET').upper(),
'fields': [input_tag.get('name') for input_tag in form.find_all(['input', 'textarea', 'select']) if input_tag.get('name')],
'source_page': current_url
})
except Exception as e:
print(f"Error crawling {current_url}: {str(e)}")
self.links_found = list(discovered_urls)
return self.links_found
def test_sql_injection(self, url: str, method: str = 'GET', params: Dict[str, str] = None) -> List[Dict[str, Any]]:
"""
Test for SQL injection vulnerabilities
"""
issues = []
# SQL injection payloads
payloads = [
"' OR '1'='1",
"' OR '1'='1' --",
"' OR 1=1--",
"' UNION SELECT NULL--",
"'; DROP TABLE users--",
"' AND 1=2 UNION SELECT 1,2,3--"
]
# Test parameters for GET requests
if method == 'GET':
for param_name in params or []:
for payload in payloads:
test_params = (params or {}).copy()
test_params[param_name] = payload
try:
response = self.session.get(url, params=test_params, timeout=10, verify=False)
# Check for SQL error indicators
error_indicators = [
'sql syntax', 'mysql', 'postgresql', 'sqlite', 'oracle',
'error', 'exception', 'traceback', 'at line', 'near',
'syntax error', 'database error'
]
response_text = response.text.lower()
if any(indicator in response_text for indicator in error_indicators):
issues.append({
'type': 'SQL Injection',
'severity': 'high',
'url': url,
'method': method,
'parameter': param_name,
'payload': payload,
'response_status': response.status_code,
'evidence': [indicator for indicator in error_indicators if indicator in response_text],
'timestamp': '2023-12-01T10:00:00Z'
})
break # Stop testing this parameter if vulnerability found
except Exception:
continue
# Test form submissions for POST requests
if method == 'POST':
for form_field in params or []:
for payload in payloads:
test_data = (params or {}).copy()
test_data[form_field] = payload
try:
response = self.session.post(url, data=test_data, timeout=10, verify=False)
response_text = response.text.lower()
error_indicators = [
'sql syntax', 'mysql', 'postgresql', 'sqlite', 'oracle',
'error', 'exception', 'traceback', 'at line', 'near'
]
if any(indicator in response_text for indicator in error_indicators):
issues.append({
'type': 'SQL Injection',
'severity': 'high',
'url': url,
'method': method,
'field': form_field,
'payload': payload,
'response_status': response.status_code,
'evidence': [indicator for indicator in error_indicators if indicator in response_text],
'timestamp': '2023-12-01T10:00:00Z'
})
break
except Exception:
continue
return issues
def test_xss(self, url: str, method: str = 'GET', params: Dict[str, str] = None) -> List[Dict[str, Any]]:
"""
Test for Cross-Site Scripting vulnerabilities
"""
issues = []
# XSS payloads
payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"javascript:alert('XSS')",
"<iframe srcdoc='<script>alert(`XSS`)</script>'></iframe>",
"<body onload=alert('XSS')>"
]
# Test parameters
for param_name in params or []:
for payload in payloads:
test_params = (params or {}).copy()
test_params[param_name] = payload
try:
response = self.session.get(url, params=test_params, timeout=10, verify=False)
# Check if payload is reflected in response
if payload in response.text:
issues.append({
'type': 'Cross-Site Scripting',
'severity': 'high',
'url': url,
'method': method,
'parameter': param_name,
'payload': payload,
'response_status': response.status_code,
'reflected': True,
'timestamp': '2023-12-01T10:00:00Z'
})
break # Stop testing this parameter if XSS found
except Exception:
continue
return issues
def test_security_headers(self, url: str) -> List[Dict[str, Any]]:
"""
Test for missing security headers
"""
issues = []
try:
response = self.session.get(url, timeout=10, verify=False)
headers = response.headers
# Check for important security headers
required_headers = [
('X-Frame-Options', 'Missing X-Frame-Options header (clickjacking protection)'),
('X-Content-Type-Options', 'Missing X-Content-Type-Options header (MIME-type sniffing protection)'),
('X-XSS-Protection', 'Missing X-XSS-Protection header (XSS filter)'),
('Strict-Transport-Security', 'Missing Strict-Transport-Security header (HSTS)'),
('Content-Security-Policy', 'Missing Content-Security-Policy header (CSP)')
]
for header_name, description in required_headers:
if header_name not in headers:
issues.append({
'type': 'Missing Security Header',
'severity': 'medium',
'url': url,
'header': header_name,
'description': description,
'timestamp': '2023-12-01T10:00:00Z'
})
except Exception as e:
issues.append({
'type': 'Security Header Test Error',
'severity': 'error',
'url': url,
'error': str(e),
'timestamp': '2023-12-01T10:00:00Z'
})
return issues
def test_directory_listing(self, url: str) -> List[Dict[str, Any]]:
"""
Test for directory listing vulnerabilities
"""
issues = []
# Common directory names to test
common_dirs = [
'admin/', 'backup/', 'config/', 'logs/', 'tmp/', 'uploads/',
'includes/', 'libraries/', 'vendor/', 'node_modules/'
]
for directory in common_dirs:
test_url = urljoin(url, directory)
try:
response = self.session.get(test_url, timeout=10, verify=False)
# Check for directory listing indicators
if response.status_code == 200:
response_text = response.text.lower()
indicators = [
'index of /', 'parent directory', 'directory listing',
'<title>index of', 'name size date'
]
if any(indicator in response_text for indicator in indicators):
issues.append({
'type': 'Directory Listing',
'severity': 'medium',
'url': test_url,
'description': 'Directory listing is enabled',
'timestamp': '2023-12-01T10:00:00Z'
})
except Exception:
continue
return issues
def run_comprehensive_scan(self) -> List[Dict[str, Any]]:
"""
Run comprehensive DAST scan
"""
print(f"Starting DAST scan of {self.base_url}")
# Crawl the application
print("Crawling application...")
self.crawl_application()
print(f"Discovered {len(self.links_found)} URLs and {len(self.forms_found)} forms")
# Test all discovered URLs and forms
all_issues = []
# Test each URL for various vulnerabilities
for url in self.links_found:
print(f"Testing URL: {url}")
# Test security headers
header_issues = self.test_security_headers(url)
all_issues.extend(header_issues)
# Test directory listing
dir_issues = self.test_directory_listing(url)
all_issues.extend(dir_issues)
# Test forms for injection vulnerabilities
for form in self.forms_found:
print(f"Testing form: {form['url']} ({form['method']})")
# Test for SQL injection
sql_issues = self.test_sql_injection(
form['url'],
form['method'],
{field: 'test_value' for field in form['fields']}
)
all_issues.extend(sql_issues)
# Test for XSS
xss_issues = self.test_xss(
form['url'],
form['method'],
{field: 'test_value' for field in form['fields']}
)
all_issues.extend(xss_issues)
self.security_issues = all_issues
return all_issues
def generate_report(self) -> Dict[str, Any]:
"""
Generate DAST scan report
"""
report = {
'scan_timestamp': '2023-12-01T10:00:00Z',
'target_url': self.base_url,
'urls_scanned': len(self.links_found),
'forms_scanned': len(self.forms_found),
'total_issues': len(self.security_issues),
'issues_by_severity': {
'critical': len([i for i in self.security_issues if i['severity'] == 'critical']),
'high': len([i for i in self.security_issues if i['severity'] == 'high']),
'medium': len([i for i in self.security_issues if i['severity'] == 'medium']),
'low': len([i for i in self.security_issues if i['severity'] == 'low']),
'error': len([i for i in self.security_issues if i['severity'] == 'error'])
},
'issues_by_type': {},
'detailed_issues': self.security_issues
}
# Count issues by type
for issue in self.security_issues:
issue_type = issue['type']
if issue_type not in report['issues_by_type']:
report['issues_by_type'][issue_type] = 0
report['issues_by_type'][issue_type] += 1
return report
# Example usage
def run_dast_scan_example():
"""
Example of running DAST scan
"""
# Initialize scanner
scanner = DASTScanner("https://example.com")
# Run comprehensive scan
issues = scanner.run_comprehensive_scan()
# Generate report
report = scanner.generate_report()
# Print summary
print(f"\nDAST Scan Results:")
print(f"URLs scanned: {report['urls_scanned']}")
print(f"Forms scanned: {report['forms_scanned']}")
print(f"Total issues found: {report['total_issues']}")
print(f"High severity: {report['issues_by_severity']['high']}")
print(f"Medium severity: {report['issues_by_severity']['medium']}")
# Output detailed report
import json
with open('dast-report.json', 'w') as f:
json.dump(report, f, indent=2)
return report
# Run example (commented out to prevent actual scanning)
# report = run_dast_scan_example()Interactive Application Security Testing (IAST)
IAST Implementation and Integration
PYTHON
# Example: IAST agent implementation
import sys
import traceback
from typing import Dict, Any, List, Optional
import json
import threading
import time
from datetime import datetime
import inspect
class IASTAgent:
"""
Interactive Application Security Testing agent
"""
def __init__(self):
self.security_events = []
self.taint_tracking_enabled = True
self.vulnerability_detectors = {
'sql_injection': self.detect_sql_injection,
'xss': self.detect_xss,
'path_traversal': self.detect_path_traversal,
'command_injection': self.detect_command_injection
}
self.tracked_variables = {}
self.session_id = self.generate_session_id()
def generate_session_id(self) -> str:
"""
Generate unique session identifier
"""
import uuid
return str(uuid.uuid4())
def instrument_method(self, method):
"""
Decorator to instrument methods for security monitoring
"""
def wrapper(*args, **kwargs):
# Track method entry
method_info = {
'method_name': method.__name__,
'class_name': method.__qualname__.split('.')[0] if '.' in method.__qualname__ else 'module',
'args': str(args)[:1000], # Limit length
'kwargs': str(kwargs)[:1000], # Limit length
'timestamp': datetime.utcnow().isoformat(),
'thread_id': threading.current_thread().ident
}
# Mark arguments as potentially tainted
for i, arg in enumerate(args):
if self.is_user_input(arg):
self.mark_as_tainted(arg, f'arg_{i}', method_info)
for key, value in kwargs.items():
if self.is_user_input(value):
self.mark_as_tainted(value, f'kwarg_{key}', method_info)
try:
result = method(*args, **kwargs)
# Check for vulnerabilities after method execution
self.check_for_vulnerabilities(method_info, args, kwargs, result)
return result
except Exception as e:
# Log exceptions for security analysis
self.log_security_event('exception_occurred', {
'method': method.__name__,
'exception': str(e),
'traceback': traceback.format_exc(),
'timestamp': datetime.utcnow().isoformat()
})
raise
# Preserve original method attributes
wrapper.__name__ = method.__name__
wrapper.__doc__ = method.__doc__
return wrapper
def is_user_input(self, value) -> bool:
"""
Determine if a value comes from user input
"""
if value is None:
return False
# Check if it's a string that could be user input
if isinstance(value, str):
# Common indicators of user input
return len(value) > 0 and not value.startswith('__') # Not a system value
# Check if it's from common user input sources
str_val = str(value)
return any(source in str_val.lower() for source in [
'user_', 'input_', 'request_', 'param_', 'query_'
])
def mark_as_tainted(self, value, label: str, context: Dict[str, Any]):
"""
Mark a value as potentially tainted
"""
value_id = id(value)
self.tracked_variables[value_id] = {
'value': value,
'label': label,
'context': context,
'tainted': True,
'sources': [context.get('method_name', 'unknown')]
}
def is_tainted(self, value) -> bool:
"""
Check if a value is marked as tainted
"""
value_id = id(value)
return value_id in self.tracked_variables and self.tracked_variables[value_id]['tainted']
def check_for_vulnerabilities(self, method_info: Dict[str, Any], args, kwargs, result):
"""
Check for vulnerabilities in method execution
"""
# Check each vulnerability type
for vuln_type, detector in self.vulnerability_detectors.items():
try:
vulnerabilities = detector(method_info, args, kwargs, result)
for vuln in vulnerabilities:
self.log_security_event(vuln_type, vuln)
except Exception as e:
self.log_security_event('detector_error', {
'detector': vuln_type,
'error': str(e),
'method': method_info['method_name']
})
def detect_sql_injection(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
"""
Detect potential SQL injection vulnerabilities
"""
vulnerabilities = []
# Check if method name suggests database operation
method_name = method_info['method_name'].lower()
if any(keyword in method_name for keyword in ['query', 'execute', 'select', 'insert', 'update', 'delete']):
# Check arguments for tainted values
for i, arg in enumerate(args):
if self.is_tainted(arg) and isinstance(arg, str):
if any(sql_keyword in arg.upper() for sql_keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'UNION']):
vulnerabilities.append({
'type': 'SQL Injection',
'severity': 'high',
'method': method_info['method_name'],
'argument_index': i,
'tainted_value': str(arg)[:200], # Limit length
'context': method_info,
'timestamp': datetime.utcnow().isoformat()
})
return vulnerabilities
def detect_xss(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
"""
Detect potential XSS vulnerabilities
"""
vulnerabilities = []
# Check if method name suggests output/rendering
method_name = method_info['method_name'].lower()
if any(keyword in method_name for keyword in ['render', 'display', 'show', 'output', 'write', 'response']):
# Check arguments for tainted values that might contain scripts
for i, arg in enumerate(args):
if self.is_tainted(arg) and isinstance(arg, str):
if any(script_tag in arg.lower() for script_tag in ['<script', 'javascript:', 'onerror=', 'onclick=']):
vulnerabilities.append({
'type': 'Cross-Site Scripting',
'severity': 'high',
'method': method_info['method_name'],
'argument_index': i,
'tainted_value': str(arg)[:200], # Limit length
'context': method_info,
'timestamp': datetime.utcnow().isoformat()
})
return vulnerabilities
def detect_path_traversal(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
"""
Detect potential path traversal vulnerabilities
"""
vulnerabilities = []
# Check if method name suggests file operations
method_name = method_info['method_name'].lower()
if any(keyword in method_name for keyword in ['file', 'read', 'write', 'open', 'download', 'upload', 'include']):
# Check arguments for tainted values containing path traversal
for i, arg in enumerate(args):
if self.is_tainted(arg) and isinstance(arg, str):
if '..' in arg or '../' in arg or '..\\' in arg:
vulnerabilities.append({
'type': 'Path Traversal',
'severity': 'high',
'method': method_info['method_name'],
'argument_index': i,
'tainted_value': str(arg)[:200], # Limit length
'context': method_info,
'timestamp': datetime.utcnow().isoformat()
})
return vulnerabilities
def detect_command_injection(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
"""
Detect potential command injection vulnerabilities
"""
vulnerabilities = []
# Check if method name suggests system/command execution
method_name = method_info['method_name'].lower()
if any(keyword in method_name for keyword in ['exec', 'system', 'shell', 'command', 'run', 'subprocess']):
# Check arguments for tainted values
for i, arg in enumerate(args):
if self.is_tainted(arg) and isinstance(arg, str):
if any(cmd_char in arg for cmd_char in [';', '|', '&', '`', '$(', ')']):
vulnerabilities.append({
'type': 'Command Injection',
'severity': 'critical',
'method': method_info['method_name'],
'argument_index': i,
'tainted_value': str(arg)[:200], # Limit length
'context': method_info,
'timestamp': datetime.utcnow().isoformat()
})
return vulnerabilities
def log_security_event(self, event_type: str, details: Dict[str, Any]):
"""
Log a security event
"""
event = {
'event_type': event_type,
'details': details,
'session_id': self.session_id,
'timestamp': datetime.utcnow().isoformat()
}
self.security_events.append(event)
# Print to console for immediate visibility (in real implementation, this might go to a security SIEM)
print(f"SECURITY EVENT [{event_type}]: {details}")
def get_security_events(self) -> List[Dict[str, Any]]:
"""
Get all collected security events
"""
return self.security_events
def generate_report(self) -> Dict[str, Any]:
"""
Generate IAST report
"""
events = self.security_events
report = {
'session_id': self.session_id,
'report_timestamp': datetime.utcnow().isoformat(),
'total_events': len(events),
'events_by_type': {},
'events_by_severity': {
'critical': len([e for e in events if e['details'].get('severity') == 'critical']),
'high': len([e for e in events if e['details'].get('severity') == 'high']),
'medium': len([e for e in events if e['details'].get('severity') == 'medium']),
'low': len([e for e in events if e['details'].get('severity') == 'low'])
},
'detailed_events': events
}
# Count events by type
for event in events:
event_type = event['event_type']
if event_type not in report['events_by_type']:
report['events_by_type'][event_type] = 0
report['events_by_type'][event_type] += 1
return report
# Example application code with IAST instrumentation
class UserService:
"""
Example service class to demonstrate IAST instrumentation
"""
def __init__(self, iast_agent: IASTAgent):
self.iast_agent = iast_agent
@iast_agent.instrument_method
def get_user_by_id(self, user_id: str):
"""
Get user by ID (potentially vulnerable to SQL injection)
"""
# Simulate database query - this would be vulnerable if user_id is not sanitized
query = f"SELECT * FROM users WHERE id = {user_id}"
print(f"Executing query: {query}")
return {"id": user_id, "name": "John Doe"}
@iast_agent.instrument_method
def display_user_profile(self, user_data: Dict[str, str]):
"""
Display user profile (potentially vulnerable to XSS)
"""
# Simulate HTML output - this would be vulnerable to XSS
html_output = f"<div>User: {user_data.get('name', '')}</div>"
print(f"Displaying: {html_output}")
return html_output
@iast_agent.instrument_method
def read_user_file(self, filename: str):
"""
Read user file (potentially vulnerable to path traversal)
"""
# Simulate file reading - this would be vulnerable to path traversal
safe_path = f"/safe/user/files/{filename}"
print(f"Reading file: {safe_path}")
return "File contents"
# Example usage
def run_iast_example():
"""
Example of running IAST instrumentation
"""
# Initialize IAST agent
iast_agent = IASTAgent()
# Create instrumented service
user_service = UserService(iast_agent)
# Simulate vulnerable operations
print("Testing IAST with simulated user inputs...")
# This should trigger SQL injection detection
user_service.get_user_by_id("1 OR 1=1") # Malicious input
# This should trigger XSS detection
user_service.display_user_profile({"name": "<script>alert('XSS')</script>"}) # Malicious input
# This should trigger path traversal detection
user_service.read_user_file("../../etc/passwd") # Malicious input
# Generate and print report
report = iast_agent.generate_report()
print(f"\nIAST Report:")
print(f"Total events: {report['total_events']}")
print(f"Critical: {report['events_by_severity']['critical']}")
print(f"High: {report['events_by_severity']['high']}")
print(f"Events by type: {report['events_by_type']}")
# Output detailed report
with open('iast-report.json', 'w') as f:
json.dump(report, f, indent=2)
return report
# Run example
# report = run_iast_example()Security Testing Automation
Automated Security Testing Pipeline
Security Testing Orchestration
YAML
# Example: Security testing pipeline configuration
name: Security Testing Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
security-static-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install bandit flake8 safety
- name: Run Bandit security scan
run: |
bandit -r . -f json -o bandit-results.json
- name: Run Safety dependency check
run: |
safety check -r requirements.txt --json > safety-results.json
- name: Run custom SAST scan
run: |
python security_tools/custom_sast_scanner.py --path . --output sast-results.json
- name: Analyze results
run: |
python security_tools/result_analyzer.py --bandit bandit-results.json --safety safety-results.json --sast sast-results.json
- name: Upload security results
uses: actions/upload-artifact@v3
with:
name: security-static-results
path: |
bandit-results.json
safety-results.json
sast-results.json
security-dynamic-analysis:
runs-on: ubuntu-latest
needs: security-static-analysis
steps:
- uses: actions/checkout@v3
- name: Deploy to test environment
run: |
# Deploy application to temporary test environment
docker-compose -f docker-compose.test.yml up -d
sleep 30 # Wait for application to start
- name: Run ZAP baseline scan
run: |
docker run -t owasp/zap2docker-stable zap-baseline.py -t http://localhost:8080 -g gen.conf
- name: Run custom DAST scan
run: |
python security_tools/custom_dast_scanner.py --target http://localhost:8080 --output dast-results.json
- name: Upload DAST results
uses: actions/upload-artifact@v3
with:
name: security-dynamic-results
path: dast-results.json
- name: Tear down test environment
if: always()
run: |
docker-compose -f docker-compose.test.yml down
security-composition-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run Snyk security scan
uses: snyk/actions/python@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --file=requirements.txt --json > snyk-results.json
- name: Run Dependency-Check
uses: dependency-check/Dependency-Check_Action@main
id: depcheck
with:
project: 'MyProject'
path: '.'
format: 'JSON'
out: 'reports'
- name: Upload composition analysis results
uses: actions/upload-artifact@v3
with:
name: security-composition-results
path: |
snyk-results.json
reports/
security-policy-check:
runs-on: ubuntu-latest
needs: [security-static-analysis, security-dynamic-analysis, security-composition-analysis]
steps:
- name: Download all security results
uses: actions/download-artifact@v3
with:
path: security-results/
- name: Check security policies
run: |
python security_tools/policy_checker.py --results-dir security-results/ --policy security-policy.json
- name: Generate security dashboard
run: |
python security_tools/dashboard_generator.py --results-dir security-results/ --output security-dashboard.html
- name: Upload security dashboard
uses: actions/upload-artifact@v3
with:
name: security-dashboard
path: security-dashboard.html
- name: Post security summary to PR
if: github.event_name == 'pull_request'
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const results = JSON.parse(fs.readFileSync('security-results/security-static-results/bandit-results.json', 'utf8'));
const comment = `Security Scan Results:\n\nCritical: ${results.metrics._totals.high} high severity issues found.`;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});Security Testing Quality Gates
Quality Gate Implementation
PYTHON
# Example: Security testing quality gates
import json
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class QualityGateResult:
passed: bool
violations: List[Dict[str, Any]]
summary: Dict[str, int]
class SecurityQualityGate:
"""
Security quality gates for CI/CD pipelines
"""
def __init__(self, policy_config: Dict[str, Any]):
self.policy_config = policy_config
def check_static_analysis_gate(self, scan_results: Dict[str, Any]) -> QualityGateResult:
"""
Check static analysis quality gate
"""
violations = []
critical_count = 0
high_count = 0
# Check for critical and high severity issues
for finding in scan_results.get('results', []):
severity = finding.get('severity', 'medium').lower()
if severity == 'critical':
critical_count += 1
violations.append({
'type': 'critical_finding',
'severity': 'critical',
'location': finding.get('location', 'unknown'),
'description': finding.get('description', 'unknown')
})
elif severity == 'high':
high_count += 1
violations.append({
'type': 'high_finding',
'severity': 'high',
'location': finding.get('location', 'unknown'),
'description': finding.get('description', 'unknown')
})
# Check against policy thresholds
max_critical = self.policy_config.get('max_critical_vulnerabilities', 0)
max_high = self.policy_config.get('max_high_vulnerabilities', 5)
if critical_count > max_critical:
violations.append({
'type': 'policy_violation',
'severity': 'critical',
'description': f'Too many critical vulnerabilities: {critical_count} > {max_critical}'
})
if high_count > max_high:
violations.append({
'type': 'policy_violation',
'severity': 'high',
'description': f'Too many high vulnerabilities: {high_count} > {max_high}'
})
passed = critical_count <= max_critical and high_count <= max_high
summary = {
'critical_vulnerabilities': critical_count,
'high_vulnerabilities': high_count,
'quality_gate_passed': passed
}
return QualityGateResult(passed=passed, violations=violations, summary=summary)
def check_dependency_analysis_gate(self, scan_results: Dict[str, Any]) -> QualityGateResult:
"""
Check dependency analysis quality gate
"""
violations = []
critical_count = 0
high_count = 0
# Parse dependency scan results
vulnerabilities = scan_results.get('vulnerabilities', [])
for vuln in vulnerabilities:
severity = vuln.get('severity', 'medium').lower()
if severity == 'critical':
critical_count += 1
violations.append({
'type': 'critical_dependency_vulnerability',
'severity': 'critical',
'package': vuln.get('package', 'unknown'),
'cve': vuln.get('id', 'unknown'),
'description': vuln.get('title', 'unknown')
})
elif severity == 'high':
high_count += 1
violations.append({
'type': 'high_dependency_vulnerability',
'severity': 'high',
'package': vuln.get('package', 'unknown'),
'cve': vuln.get('id', 'unknown'),
'description': vuln.get('title', 'unknown')
})
# Check against policy thresholds
max_critical_deps = self.policy_config.get('max_critical_dependency_vulnerabilities', 0)
max_high_deps = self.policy_config.get('max_high_dependency_vulnerabilities', 3)
if critical_count > max_critical_deps:
violations.append({
'type': 'policy_violation',
'severity': 'critical',
'description': f'Too many critical dependency vulnerabilities: {critical_count} > {max_critical_deps}'
})
if high_count > max_high_deps:
violations.append({
'type': 'policy_violation',
'severity': 'high',
'description': f'Too many high dependency vulnerabilities: {high_count} > {max_high_deps}'
})
passed = critical_count <= max_critical_deps and high_count <= max_high_deps
summary = {
'critical_dependency_vulnerabilities': critical_count,
'high_dependency_vulnerabilities': high_count,
'quality_gate_passed': passed
}
return QualityGateResult(passed=passed, violations=violations, summary=summary)
def check_dynamic_analysis_gate(self, scan_results: Dict[str, Any]) -> QualityGateResult:
"""
Check dynamic analysis quality gate
"""
violations = []
critical_count = 0
high_count = 0
# Parse dynamic scan results
issues = scan_results.get('issues', [])
for issue in issues:
severity = issue.get('severity', 'medium').lower()
if severity == 'critical':
critical_count += 1
violations.append({
'type': 'critical_runtime_vulnerability',
'severity': 'critical',
'url': issue.get('url', 'unknown'),
'description': issue.get('description', 'unknown')
})
elif severity == 'high':
high_count += 1
violations.append({
'type': 'high_runtime_vulnerability',
'severity': 'high',
'url': issue.get('url', 'unknown'),
'description': issue.get('description', 'unknown')
})
# Check against policy thresholds
max_critical_dynamic = self.policy_config.get('max_critical_dynamic_vulnerabilities', 0)
max_high_dynamic = self.policy_config.get('max_high_dynamic_vulnerabilities', 2)
if critical_count > max_critical_dynamic:
violations.append({
'type': 'policy_violation',
'severity': 'critical',
'description': f'Too many critical dynamic vulnerabilities: {critical_count} > {max_critical_dynamic}'
})
if high_count > max_high_dynamic:
violations.append({
'type': 'policy_violation',
'severity': 'high',
'description': f'Too many high dynamic vulnerabilities: {high_count} > {max_high_dynamic}'
})
passed = critical_count <= max_critical_dynamic and high_count <= max_high_dynamic
summary = {
'critical_dynamic_vulnerabilities': critical_count,
'high_dynamic_vulnerabilities': high_count,
'quality_gate_passed': passed
}
return QualityGateResult(passed=passed, violations=violations, summary=summary)
def run_all_quality_gates(self, scan_results: Dict[str, Any]) -> Dict[str, QualityGateResult]:
"""
Run all quality gates and return results
"""
results = {}
if 'static_analysis' in scan_results:
results['static_analysis'] = self.check_static_analysis_gate(scan_results['static_analysis'])
if 'dependency_analysis' in scan_results:
results['dependency_analysis'] = self.check_dependency_analysis_gate(scan_results['dependency_analysis'])
if 'dynamic_analysis' in scan_results:
results['dynamic_analysis'] = self.check_dynamic_analysis_gate(scan_results['dynamic_analysis'])
return results
def overall_quality_gate_passed(self, gate_results: Dict[str, QualityGateResult]) -> bool:
"""
Check if overall quality gate passes (all individual gates must pass)
"""
return all(result.passed for result in gate_results.values())
# Example usage
def run_security_quality_gates():
"""
Example of running security quality gates
"""
# Define security policy
security_policy = {
'max_critical_vulnerabilities': 0,
'max_high_vulnerabilities': 5,
'max_critical_dependency_vulnerabilities': 0,
'max_high_dependency_vulnerabilities': 3,
'max_critical_dynamic_vulnerabilities': 0,
'max_high_dynamic_vulnerabilities': 2
}
# Initialize quality gate checker
quality_gate = SecurityQualityGate(security_policy)
# Example scan results (in real implementation, these would come from actual scanners)
scan_results = {
'static_analysis': {
'results': [
{
'severity': 'high',
'location': 'app/models/user.py:45',
'description': 'SQL injection vulnerability'
},
{
'severity': 'medium',
'location': 'app/views/auth.py:123',
'description': 'Weak password policy'
}
]
},
'dependency_analysis': {
'vulnerabilities': [
{
'severity': 'high',
'package': 'django',
'id': 'CVE-2023-1234',
'title': 'Django security issue'
}
]
},
'dynamic_analysis': {
'issues': [
{
'severity': 'medium',
'url': 'https://test.example.com/login',
'description': 'Missing security headers'
}
]
}
}
# Run all quality gates
gate_results = quality_gate.run_all_quality_gates(scan_results)
# Check overall result
overall_passed = quality_gate.overall_quality_gate_passed(gate_results)
print(f"Overall quality gate passed: {overall_passed}")
# Print individual results
for gate_name, result in gate_results.items():
print(f"{gate_name}: {'PASS' if result.passed else 'FAIL'}")
print(f" Summary: {result