CloudTadaInsights

DevSecOps Security Testing and Assurance

DevSecOps Security Testing and Assurance

Overview

DevSecOps security testing and assurance integrate security validation throughout the software development lifecycle, ensuring that security is not an afterthought but an integral part of the development process. This article explores how to implement comprehensive security testing strategies that provide continuous security assurance while maintaining development velocity.

Security Testing in the Development Lifecycle

Shift-Left Security Testing

Early Integration of Security Tests

Security testing begins at the earliest stages of development, moving security validation left in the development pipeline:

PYTHON
# Example: Security testing framework integration
import unittest
from unittest.mock import patch, MagicMock
import requests
from typing import Dict, List, Any
import json
from datetime import datetime

class SecurityTestFramework:
    """
    Framework for integrating security tests throughout the development lifecycle
    """
    
    def __init__(self):
        self.test_categories = {
            'static_analysis': [],
            'dynamic_analysis': [],
            'dependency_scanning': [],
            'configuration_testing': [],
            'runtime_security': []
        }
        self.test_results = []
        self.vulnerability_db = self.load_vulnerability_database()
    
    def load_vulnerability_database(self) -> Dict[str, Any]:
        """
        Load known vulnerabilities database
        """
        return {
            'cwe': {},  # Common Weakness Enumeration
            'cve': {},  # Common Vulnerabilities and Exposures
            'owasp_top_10': {}  # OWASP Top 10 vulnerabilities
        }
    
    def register_static_analysis_test(self, test_func, severity='medium'):
        """
        Register a static analysis security test
        """
        self.test_categories['static_analysis'].append({
            'func': test_func,
            'severity': severity,
            'registered_at': datetime.utcnow()
        })
    
    def register_dynamic_analysis_test(self, test_func, severity='high'):
        """
        Register a dynamic analysis security test
        """
        self.test_categories['dynamic_analysis'].append({
            'func': test_func,
            'severity': severity,
            'registered_at': datetime.utcnow()
        })
    
    def run_static_analysis_tests(self, code_files: List[str]) -> List[Dict[str, Any]]:
        """
        Run all registered static analysis tests
        """
        results = []
        
        for test_config in self.test_categories['static_analysis']:
            for file_path in code_files:
                try:
                    result = test_config['func'](file_path)
                    if result:  # If test found issues
                        results.append({
                            'test_name': test_config['func'].__name__,
                            'file_path': file_path,
                            'severity': test_config['severity'],
                            'findings': result,
                            'timestamp': datetime.utcnow().isoformat()
                        })
                except Exception as e:
                    results.append({
                        'test_name': test_config['func'].__name__,
                        'file_path': file_path,
                        'severity': 'error',
                        'findings': f'Test execution error: {str(e)}',
                        'timestamp': datetime.utcnow().isoformat()
                    })
        
        return results
    
    def run_dynamic_analysis_tests(self, target_url: str) -> List[Dict[str, Any]]:
        """
        Run all registered dynamic analysis tests
        """
        results = []
        
        for test_config in self.test_categories['dynamic_analysis']:
            try:
                result = test_config['func'](target_url)
                if result:  # If test found issues
                    results.append({
                        'test_name': test_config['func'].__name__,
                        'target_url': target_url,
                        'severity': test_config['severity'],
                        'findings': result,
                        'timestamp': datetime.utcnow().isoformat()
                    })
            except Exception as e:
                results.append({
                    'test_name': test_config['func'].__name__,
                    'target_url': target_url,
                    'severity': 'error',
                    'findings': f'Test execution error: {str(e)}',
                    'timestamp': datetime.utcnow().isoformat()
                })
        
        return results

# Example security test functions
def test_sql_injection_static(file_path: str) -> List[Dict[str, Any]]:
    """
    Static analysis test for SQL injection vulnerabilities
    """
    findings = []
    
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        lines = f.readlines()
    
    for i, line in enumerate(lines, 1):
        # Look for common SQL injection patterns
        if any(pattern in line.lower() for pattern in ['select * from', 'insert into', 'update ', 'delete from']):
            if any(dangerous in line for dangerous in ['+ "', '+ variable', '+ input']):
                findings.append({
                    'line_number': i,
                    'code_snippet': line.strip(),
                    'vulnerability_type': 'SQL Injection',
                    'severity': 'high'
                })
    
    return findings

def test_xss_static(file_path: str) -> List[Dict[str, Any]]:
    """
    Static analysis test for XSS vulnerabilities
    """
    findings = []
    
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        lines = f.readlines()
    
    for i, line in enumerate(lines, 1):
        # Look for common XSS patterns
        if 'innerHTML' in line or 'document.write' in line:
            findings.append({
                'line_number': i,
                'code_snippet': line.strip(),
                'vulnerability_type': 'Cross-Site Scripting',
                'severity': 'high'
            })
        
        if 'eval(' in line or 'Function(' in line:
            findings.append({
                'line_number': i,
                'code_snippet': line.strip(),
                'vulnerability_type': 'Code Injection',
                'severity': 'critical'
            })
    
    return findings

def test_authentication_bypass(target_url: str) -> List[Dict[str, Any]]:
    """
    Dynamic test for authentication bypass
    """
    findings = []
    
    # Test with common authentication bypass techniques
    bypass_attempts = [
        {'Cookie': 'auth=admin; role=admin'},
        {'Authorization': 'Bearer invalid_token'},
        {'X-Forwarded-For': '127.0.0.1'}
    ]
    
    for headers in bypass_attempts:
        try:
            response = requests.get(target_url, headers=headers, timeout=10)
            if response.status_code not in [401, 403]:  # Not unauthorized/forbidden
                findings.append({
                    'test_type': 'Authentication Bypass',
                    'headers_used': headers,
                    'response_status': response.status_code,
                    'severity': 'critical'
                })
        except requests.RequestException:
            continue
    
    return findings

def test_sql_injection_dynamic(target_url: str) -> List[Dict[str, Any]]:
    """
    Dynamic test for SQL injection vulnerabilities
    """
    findings = []
    
    # Common SQL injection payloads
    sql_payloads = [
        "' OR '1'='1",
        "' UNION SELECT * FROM users--",
        "'; DROP TABLE users--",
        "admin'--",
        "1' AND 1=1--"
    ]
    
    # Test parameters commonly vulnerable to SQL injection
    test_params = ['id', 'user', 'email', 'search', 'filter']
    
    for param in test_params:
        for payload in sql_payloads:
            try:
                # Test GET request
                test_url = f"{target_url}?{param}={payload}"
                response = requests.get(test_url, timeout=10)
                
                # Check for SQL error indicators
                error_indicators = [
                    'sql syntax', 'mysql', 'postgresql', 'sqlite', 'oracle',
                    'error', 'exception', 'traceback', 'at line'
                ]
                
                response_text = response.text.lower()
                if any(indicator in response_text for indicator in error_indicators):
                    findings.append({
                        'test_type': 'SQL Injection',
                        'parameter': param,
                        'payload_used': payload,
                        'response_status': response.status_code,
                        'error_indicators_found': [indicator for indicator in error_indicators if indicator in response_text],
                        'severity': 'high'
                    })
                    break  # Found vulnerability, move to next parameter
            except requests.RequestException:
                continue
    
    return findings

# Initialize the security testing framework
security_framework = SecurityTestFramework()

# Register security tests
security_framework.register_static_analysis_test(test_sql_injection_static, 'high')
security_framework.register_static_analysis_test(test_xss_static, 'high')
security_framework.register_dynamic_analysis_test(test_authentication_bypass, 'critical')
security_framework.register_dynamic_analysis_test(test_sql_injection_dynamic, 'high')

# Example usage in development workflow
def run_security_tests_in_ci_cd():
    """
    Example of running security tests in CI/CD pipeline
    """
    print("Running security tests...")
    
    # Run static analysis on code files
    code_files = ['app/main.py', 'app/auth.py', 'app/models.py']
    static_results = security_framework.run_static_analysis_tests(code_files)
    
    print(f"Static analysis found {len(static_results)} issues")
    
    # Run dynamic analysis on test environment
    target_url = "http://test-env.company.com/api/users"
    dynamic_results = security_framework.run_dynamic_analysis_tests(target_url)
    
    print(f"Dynamic analysis found {len(dynamic_results)} issues")
    
    # Aggregate results
    all_results = static_results + dynamic_results
    
    # Check if critical issues exist
    critical_issues = [r for r in all_results if r['severity'] in ['critical', 'error']]
    
    if critical_issues:
        print(f"Critical security issues found: {len(critical_issues)}")
        print("Blocking deployment due to critical security issues")
        return False
    else:
        print("No critical security issues found. Proceeding with deployment.")
        return True

# Run the example
# run_security_tests_in_ci_cd()

Pre-Commit Security Hooks

Implementing security checks before code commits:

BASH
#!/bin/bash
# .git/hooks/pre-commit - Security pre-commit hook

echo "Running pre-commit security checks..."

# Check for secrets in committed code
if git diff --cached --name-only | xargs grep -l -i -E "(password|secret|key|token|private)" --include="*.py,*.js,*.java,*.json,*.yaml,*.yml,*.xml,*.env" 2>/dev/null; then
    echo "ERROR: Potential secrets detected in committed code!"
    echo "Please remove sensitive information before committing."
    exit 1
fi

# Run static analysis on changed files
CHANGED_PYTHON_FILES=$(git diff --cached --name-only --diff-filter=ACMR | grep "\.py$" | tr '\n' ' ')
if [ -n "$CHANGED_PYTHON_FILES" ]; then
    echo "Running Python security analysis on: $CHANGED_PYTHON_FILES"
    
    # Run bandit security analyzer
    if command -v bandit &> /dev/null; then
        bandit -r $CHANGED_PYTHON_FILES
        BANDIT_EXIT_CODE=$?
        if [ $BANDIT_EXIT_CODE -ne 0 ]; then
            echo "Bandit security scan failed. Please address the issues."
            exit 1
        fi
    fi
fi

# Check for common security issues in JavaScript
CHANGED_JS_FILES=$(git diff --cached --name-only --diff-filter=ACMR | grep "\.js$" | tr '\n' ' ')
if [ -n "$CHANGED_JS_FILES" ]; then
    echo "Running JavaScript security analysis on: $CHANGED_JS_FILES"
    
    # Run eslint with security rules
    if command -v eslint &> /dev/null; then
        eslint --quiet --fix $CHANGED_JS_FILES
        ESLINT_EXIT_CODE=$?
        if [ $ESLINT_EXIT_CODE -ne 0 ]; then
            echo "ESLint security scan failed. Please address the issues."
            exit 1
        fi
    fi
fi

# Check for dependency vulnerabilities
if [ -f "requirements.txt" ] || [ -f "package-lock.json" ] || [ -f "Gemfile.lock" ]; then
    echo "Checking for vulnerable dependencies..."
    
    # For Python projects
    if [ -f "requirements.txt" ]; then
        if command -v pip-audit &> /dev/null; then
            pip-audit -r requirements.txt
            PIP_AUDIT_EXIT_CODE=$?
            if [ $PIP_AUDIT_EXIT_CODE -ne 0 ]; then
                echo "Dependency vulnerability scan failed. Please update vulnerable packages."
                exit 1
            fi
        fi
    fi
    
    # For Node.js projects
    if [ -f "package-lock.json" ]; then
        if command -v npm &> /dev/null; then
            npm audit --audit-level high
            NPM_AUDIT_EXIT_CODE=$?
            if [ $NPM_AUDIT_EXIT_CODE -ne 0 ]; then
                echo "NPM dependency audit failed. Please address vulnerabilities."
                exit 1
            fi
        fi
    fi
fi

echo "All security checks passed. Commit approved."
exit 0

Continuous Security Testing Pipeline

Security Testing as Code

Implementing security tests as part of the application codebase:

PYTHON
# security_tests/application_security_tests.py
import unittest
import requests
from unittest.mock import patch, MagicMock
import json
from typing import Dict, Any

class ApplicationSecurityTests(unittest.TestCase):
    """
    Security tests for the application
    """
    
    def setUp(self):
        """
        Set up test environment
        """
        self.base_url = "http://test-app.company.com"
        self.session = requests.Session()
    
    def test_input_validation(self):
        """
        Test input validation for common attack vectors
        """
        # Test SQL injection
        malicious_inputs = [
            "' OR '1'='1",
            "'; DROP TABLE users--",
            "admin'--",
            "<script>alert('xss')</script>"
        ]
        
        for malicious_input in malicious_inputs:
            with self.subTest(input=malicious_input):
                # Test in different contexts
                response = self.session.post(
                    f"{self.base_url}/api/search",
                    json={'query': malicious_input}
                )
                
                # Should not return 5xx errors or expose internal details
                self.assertNotEqual(response.status_code, 500)
                
                # Should not contain database error messages
                self.assertNotIn('sql', response.text.lower())
                self.assertNotIn('database', response.text.lower())
    
    def test_authentication_required_endpoints(self):
        """
        Test that protected endpoints require authentication
        """
        protected_endpoints = [
            '/api/admin/users',
            '/api/admin/config',
            '/api/user/profile',
            '/api/user/settings'
        ]
        
        for endpoint in protected_endpoints:
            with self.subTest(endpoint=endpoint):
                # Try to access without authentication
                response = self.session.get(f"{self.base_url}{endpoint}")
                
                # Should return 401 or 403
                self.assertIn(response.status_code, [401, 403])
    
    def test_authorization_boundaries(self):
        """
        Test that users cannot access resources they shouldn't
        """
        # Test user trying to access another user's data
        response = self.session.get(
            f"{self.base_url}/api/user/other-user-id/profile",
            headers={'Authorization': 'Bearer valid-user-token'}
        )
        
        # Should return 403 Forbidden or similar
        self.assertEqual(response.status_code, 403)
    
    def test_csrf_protection(self):
        """
        Test CSRF protection mechanisms
        """
        # Try to make state-changing request without CSRF token
        response = self.session.post(
            f"{self.base_url}/api/user/profile",
            data={'name': 'New Name'},
            headers={'Content-Type': 'application/x-www-form-urlencoded'}
        )
        
        # Should be rejected
        self.assertIn(response.status_code, [400, 401, 403])
    
    def test_rate_limiting(self):
        """
        Test rate limiting protections
        """
        # Make multiple requests rapidly
        for i in range(100):
            response = self.session.get(f"{self.base_url}/api/public/endpoint")
            if response.status_code == 429:  # Too Many Requests
                # Rate limiting is working
                break
        
        # If we didn't hit the rate limit, it might not be properly implemented
        self.assertEqual(response.status_code, 429, "Rate limiting not properly implemented")
    
    def test_secure_headers(self):
        """
        Test that security headers are properly set
        """
        response = self.session.get(f"{self.base_url}/")
        
        # Check for important security headers
        security_headers = [
            'X-Frame-Options',
            'X-Content-Type-Options',
            'X-XSS-Protection',
            'Strict-Transport-Security'
        ]
        
        for header in security_headers:
            self.assertIn(header, response.headers, f"Missing security header: {header}")
    
    def test_content_security_policy(self):
        """
        Test Content Security Policy implementation
        """
        response = self.session.get(f"{self.base_url}/")
        
        csp_header = response.headers.get('Content-Security-Policy', '')
        
        # Should have CSP header
        self.assertTrue(csp_header, "Content-Security-Policy header not found")
        
        # Should not allow 'unsafe-inline' or 'unsafe-eval' by default
        self.assertNotIn("'unsafe-inline'", csp_header.lower())
        self.assertNotIn("'unsafe-eval'", csp_header.lower())

# security_tests/infrastructure_security_tests.py
import boto3
import unittest
from unittest.mock import patch, MagicMock
from moto import mock_ec2, mock_s3, mock_iam

class InfrastructureSecurityTests(unittest.TestCase):
    """
    Security tests for infrastructure components
    """
    
    @mock_ec2
    def test_security_group_restrictions(self):
        """
        Test that security groups are properly restrictive
        """
        ec2 = boto3.resource('ec2', region_name='us-east-1')
        
        # Create a test security group
        sg = ec2.create_security_group(
            GroupName='test-sg',
            Description='Test security group for security testing'
        )
        
        # Test that the security group doesn't allow all traffic by default
        # This would be a security misconfiguration
        for permission in sg.ip_permissions:
            # Check if it allows all IPs (0.0.0.0/0) for sensitive ports
            for ip_range in permission.get('IpRanges', []):
                if ip_range.get('CidrIp') == '0.0.0.0/0':
                    # For sensitive ports, this should not be allowed
                    sensitive_ports = [22, 3389, 80, 443]  # SSH, RDP, HTTP, HTTPS
                    if permission.get('FromPort') in sensitive_ports:
                        self.fail(f"Security group allows all IPs on sensitive port {permission.get('FromPort')}")
    
    @mock_s3
    def test_s3_bucket_security(self):
        """
        Test S3 bucket security configurations
        """
        s3 = boto3.client('s3', region_name='us-east-1')
        
        # Create a test bucket
        bucket_name = 'test-security-bucket'
        s3.create_bucket(Bucket=bucket_name)
        
        # Check that public access is blocked
        public_access_block = s3.get_public_access_block(Bucket=bucket_name)
        
        block_config = public_access_block['PublicAccessBlockConfiguration']
        self.assertTrue(block_config['BlockPublicAcls'])
        self.assertTrue(block_config['BlockPublicPolicy'])
        self.assertTrue(block_config['IgnorePublicAcls'])
        self.assertTrue(block_config['RestrictPublicBuckets'])
    
    @mock_iam
    def test_iam_policy_restrictions(self):
        """
        Test IAM policy security
        """
        iam = boto3.client('iam', region_name='us-east-1')
        
        # Create a test user
        user_name = 'test-user'
        iam.create_user(UserName=user_name)
        
        # Attach a test policy
        policy_doc = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": "s3:GetObject",
                    "Resource": "*"
                }
            ]
        }
        
        policy_name = 'test-policy'
        policy_arn = iam.create_policy(
            PolicyName=policy_name,
            PolicyDocument=json.dumps(policy_doc)
        )['Policy']['Arn']
        
        iam.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
        
        # Test that overly permissive policies are flagged
        attached_policies = iam.list_attached_user_policies(UserName=user_name)
        
        for policy in attached_policies['AttachedPolicies']:
            policy_doc = iam.get_policy(PolicyArn=policy['PolicyArn'])
            # In a real test, we'd analyze the policy document for security issues
            # For this example, we'll just ensure the policy exists
            self.assertIsNotNone(policy_doc)

# Run security tests as part of CI/CD
def run_security_test_suite():
    """
    Run the complete security test suite
    """
    # Create test suite
    loader = unittest.TestLoader()
    
    # Load application security tests
    app_suite = loader.loadTestsFromTestCase(ApplicationSecurityTests)
    
    # Load infrastructure security tests
    infra_suite = loader.loadTestsFromTestCase(InfrastructureSecurityTests)
    
    # Combine suites
    full_suite = unittest.TestSuite([app_suite, infra_suite])
    
    # Run tests
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(full_suite)
    
    # Return success/failure
    return result.wasSuccessful()

if __name__ == '__main__':
    success = run_security_test_suite()
    exit(0 if success else 1)

Security Testing Types and Techniques

Static Application Security Testing (SAST)

SAST Implementation and Best Practices

PYTHON
# Example: Custom SAST scanner implementation
import ast
import re
from typing import List, Dict, Any
from pathlib import Path
import json

class CustomSASTScanner:
    """
    Custom Static Application Security Testing scanner
    """
    
    def __init__(self):
        self.rules = self.load_security_rules()
        self.findings = []
    
    def load_security_rules(self) -> Dict[str, Any]:
        """
        Load security rules for scanning
        """
        return {
            'sql_injection': {
                'patterns': [
                    r'(cursor\.execute|\.query|\.find)\([^)]*[\+\%]\s*[\'"]',
                    r'(select|insert|update|delete)\s+.*\s+where\s+.*\s*[=\+\%]',
                ],
                'severity': 'high',
                'description': 'Potential SQL injection vulnerability'
            },
            'xss': {
                'patterns': [
                    r'document\.write\(',
                    r'innerHTML\s*=',
                    r'outerHTML\s*=',
                    r'eval\(',
                    r'Function\('
                ],
                'severity': 'high',
                'description': 'Potential Cross-Site Scripting vulnerability'
            },
            'hardcoded_credentials': {
                'patterns': [
                    r'(password|secret|key|token|auth)[\s=:]+["\'][^"\']+["\']',
                    r'(os\.environ\.get|os\.getenv)\([^)]*(password|secret|key|token|auth)',
                ],
                'severity': 'critical',
                'description': 'Hardcoded credentials detected'
            },
            'path_traversal': {
                'patterns': [
                    r'os\.path\.join\([^)]*\+\s*[\'"][\.\.\/]',
                    r'open\([^)]*[\+\%]\s*[\'"][\.\.\/]',
                ],
                'severity': 'high',
                'description': 'Potential path traversal vulnerability'
            }
        }
    
    def scan_python_file(self, file_path: str) -> List[Dict[str, Any]]:
        """
        Scan a Python file for security vulnerabilities
        """
        findings = []
        
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
                lines = content.split('\n')
            
            # Parse the AST to understand code structure
            try:
                tree = ast.parse(content)
            except SyntaxError:
                # If syntax error, still scan for patterns
                tree = None
            
            # Check each rule
            for rule_name, rule_config in self.rules.items():
                for pattern in rule_config['patterns']:
                    compiled_pattern = re.compile(pattern, re.IGNORECASE)
                    
                    for line_num, line in enumerate(lines, 1):
                        matches = compiled_pattern.finditer(line)
                        for match in matches:
                            findings.append({
                                'rule': rule_name,
                                'severity': rule_config['severity'],
                                'description': rule_config['description'],
                                'file_path': file_path,
                                'line_number': line_num,
                                'column_start': match.start(),
                                'column_end': match.end(),
                                'code_snippet': line.strip(),
                                'pattern_matched': match.group(0),
                                'timestamp': '2023-12-01T10:00:00Z'
                            })
            
            # Additional AST-based analysis
            if tree:
                findings.extend(self.analyze_ast(tree, file_path, lines))
        
        except Exception as e:
            findings.append({
                'rule': 'scan_error',
                'severity': 'error',
                'description': f'Scan error: {str(e)}',
                'file_path': file_path,
                'line_number': 0,
                'code_snippet': '',
                'timestamp': '2023-12-01T10:00:00Z'
            })
        
        return findings
    
    def analyze_ast(self, tree: ast.AST, file_path: str, lines: List[str]) -> List[Dict[str, Any]]:
        """
        Analyze Python AST for security issues
        """
        findings = []
        
        for node in ast.walk(tree):
            # Check for eval usage
            if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'eval':
                line_num = node.lineno
                findings.append({
                    'rule': 'dangerous_eval',
                    'severity': 'critical',
                    'description': 'Use of eval() function is dangerous',
                    'file_path': file_path,
                    'line_number': line_num,
                    'column_start': getattr(node, 'col_offset', 0),
                    'code_snippet': lines[line_num - 1].strip() if line_num <= len(lines) else '',
                    'timestamp': '2023-12-01T10:00:00Z'
                })
            
            # Check for exec usage
            if isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and node.func.id == 'exec':
                line_num = node.lineno
                findings.append({
                    'rule': 'dangerous_exec',
                    'severity': 'critical',
                    'description': 'Use of exec() function is dangerous',
                    'file_path': file_path,
                    'line_number': line_num,
                    'column_start': getattr(node, 'col_offset', 0),
                    'code_snippet': lines[line_num - 1].strip() if line_num <= len(lines) else '',
                    'timestamp': '2023-12-01T10:00:00Z'
                })
            
            # Check for dangerous imports
            if isinstance(node, ast.ImportFrom) and node.module == 'os':
                for alias in node.names:
                    if alias.name == 'system':
                        line_num = node.lineno
                        findings.append({
                            'rule': 'dangerous_import',
                            'severity': 'high',
                            'description': 'Import of dangerous os.system() function',
                            'file_path': file_path,
                            'line_number': line_num,
                            'column_start': getattr(node, 'col_offset', 0),
                            'code_snippet': lines[line_num - 1].strip() if line_num <= len(lines) else '',
                            'timestamp': '2023-12-01T10:00:00Z'
                        })
        
        return findings
    
    def scan_directory(self, directory_path: str) -> List[Dict[str, Any]]:
        """
        Scan an entire directory for security issues
        """
        all_findings = []
        
        path = Path(directory_path)
        
        # Find all Python files
        python_files = list(path.rglob("*.py"))
        
        for py_file in python_files:
            file_findings = self.scan_python_file(str(py_file))
            all_findings.extend(file_findings)
        
        return all_findings
    
    def generate_report(self, findings: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Generate a security report from findings
        """
        report = {
            'scan_timestamp': '2023-12-01T10:00:00Z',
            'total_findings': len(findings),
            'findings_by_severity': {
                'critical': len([f for f in findings if f['severity'] == 'critical']),
                'high': len([f for f in findings if f['severity'] == 'high']),
                'medium': len([f for f in findings if f['severity'] == 'medium']),
                'low': len([f for f in findings if f['severity'] == 'low']),
                'error': len([f for f in findings if f['severity'] == 'error'])
            },
            'findings_by_rule': {},
            'detailed_findings': findings
        }
        
        # Count findings by rule
        for finding in findings:
            rule = finding['rule']
            if rule not in report['findings_by_rule']:
                report['findings_by_rule'][rule] = 0
            report['findings_by_rule'][rule] += 1
        
        return report

# Example usage of the SAST scanner
def run_custom_sast_scan():
    """
    Example of running custom SAST scan
    """
    scanner = CustomSASTScanner()
    
    # Scan a directory
    findings = scanner.scan_directory("./src/")
    
    # Generate report
    report = scanner.generate_report(findings)
    
    # Print summary
    print(f"Total findings: {report['total_findings']}")
    print(f"Critical: {report['findings_by_severity']['critical']}")
    print(f"High: {report['findings_by_severity']['high']}")
    print(f"Medium: {report['findings_by_severity']['medium']}")
    print(f"Low: {report['findings_by_severity']['low']}")
    
    # Output detailed report
    with open('sast-report.json', 'w') as f:
        json.dump(report, f, indent=2)
    
    return report

# Example vulnerable code to test the scanner
vulnerable_code_example = '''
import os
import sqlite3

def get_user_data(user_id):
    conn = sqlite3.connect("users.db")
    cursor = conn.cursor()
    
    # Vulnerable to SQL injection
    query = "SELECT * FROM users WHERE id = " + user_id
    cursor.execute(query)
    
    return cursor.fetchall()

def display_message(message):
    # Vulnerable to XSS
    html = "<div>" + message + "</div>"
    document.write(html)

def dangerous_function(user_input):
    # Dangerous eval usage
    result = eval(user_input)
    return result

# Hardcoded credentials
API_KEY = "sk-very-secret-key-do-not-commit"

def path_traversal(filename):
    # Vulnerable to path traversal
    filepath = os.path.join("/safe/directory/", filename)
    with open(filepath, "r") as f:
        return f.read()
'''

# Save example to test file
with open('test_vulnerable.py', 'w') as f:
    f.write(vulnerable_code_example)

# Run the scanner on the test file
scanner = CustomSASTScanner()
findings = scanner.scan_python_file('test_vulnerable.py')
print(f"Findings in test file: {len(findings)}")

for finding in findings:
    print(f"Rule: {finding['rule']}, Severity: {finding['severity']}, Line: {finding['line_number']}")

Dynamic Application Security Testing (DAST)

DAST Implementation and Automation

PYTHON
# Example: DAST scanner implementation
import requests
import urllib3
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re
from typing import List, Dict, Any, Set
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class DASTScanner:
    """
    Dynamic Application Security Testing scanner
    """
    
    def __init__(self, base_url: str, max_threads: int = 5):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'DevSecOps-DAST-Scanner/1.0',
            'Accept': '*/*',
            'Accept-Language': 'en-US,en;q=0.9'
        })
        self.max_threads = max_threads
        self.visited_urls: Set[str] = set()
        self.forms_found: List[Dict[str, Any]] = []
        self.links_found: List[str] = []
        self.security_issues: List[Dict[str, Any]] = []
    
    def crawl_application(self, max_pages: int = 100) -> List[str]:
        """
        Crawl the application to discover URLs
        """
        urls_to_visit = [self.base_url]
        discovered_urls = {self.base_url}
        
        while urls_to_visit and len(discovered_urls) < max_pages:
            current_url = urls_to_visit.pop(0)
            
            if current_url in self.visited_urls:
                continue
            
            try:
                response = self.session.get(current_url, timeout=10, verify=False)
                self.visited_urls.add(current_url)
                
                soup = BeautifulSoup(response.content, 'html.parser')
                
                # Find all links
                for link in soup.find_all('a', href=True):
                    absolute_url = urljoin(current_url, link['href'])
                    parsed_url = urlparse(absolute_url)
                    
                    # Only follow links within the same domain
                    if parsed_url.netloc == urlparse(self.base_url).netloc:
                        if absolute_url not in discovered_urls:
                            discovered_urls.add(absolute_url)
                            urls_to_visit.append(absolute_url)
                
                # Find all forms
                for form in soup.find_all('form'):
                    form_action = form.get('action', '')
                    if form_action:
                        form_url = urljoin(current_url, form_action)
                        self.forms_found.append({
                            'url': form_url,
                            'method': form.get('method', 'GET').upper(),
                            'fields': [input_tag.get('name') for input_tag in form.find_all(['input', 'textarea', 'select']) if input_tag.get('name')],
                            'source_page': current_url
                        })
                
            except Exception as e:
                print(f"Error crawling {current_url}: {str(e)}")
        
        self.links_found = list(discovered_urls)
        return self.links_found
    
    def test_sql_injection(self, url: str, method: str = 'GET', params: Dict[str, str] = None) -> List[Dict[str, Any]]:
        """
        Test for SQL injection vulnerabilities
        """
        issues = []
        
        # SQL injection payloads
        payloads = [
            "' OR '1'='1",
            "' OR '1'='1' --",
            "' OR 1=1--",
            "' UNION SELECT NULL--",
            "'; DROP TABLE users--",
            "' AND 1=2 UNION SELECT 1,2,3--"
        ]
        
        # Test parameters for GET requests
        if method == 'GET':
            for param_name in params or []:
                for payload in payloads:
                    test_params = (params or {}).copy()
                    test_params[param_name] = payload
                    
                    try:
                        response = self.session.get(url, params=test_params, timeout=10, verify=False)
                        
                        # Check for SQL error indicators
                        error_indicators = [
                            'sql syntax', 'mysql', 'postgresql', 'sqlite', 'oracle',
                            'error', 'exception', 'traceback', 'at line', 'near',
                            'syntax error', 'database error'
                        ]
                        
                        response_text = response.text.lower()
                        if any(indicator in response_text for indicator in error_indicators):
                            issues.append({
                                'type': 'SQL Injection',
                                'severity': 'high',
                                'url': url,
                                'method': method,
                                'parameter': param_name,
                                'payload': payload,
                                'response_status': response.status_code,
                                'evidence': [indicator for indicator in error_indicators if indicator in response_text],
                                'timestamp': '2023-12-01T10:00:00Z'
                            })
                            break  # Stop testing this parameter if vulnerability found
                    except Exception:
                        continue
        
        # Test form submissions for POST requests
        if method == 'POST':
            for form_field in params or []:
                for payload in payloads:
                    test_data = (params or {}).copy()
                    test_data[form_field] = payload
                    
                    try:
                        response = self.session.post(url, data=test_data, timeout=10, verify=False)
                        
                        response_text = response.text.lower()
                        error_indicators = [
                            'sql syntax', 'mysql', 'postgresql', 'sqlite', 'oracle',
                            'error', 'exception', 'traceback', 'at line', 'near'
                        ]
                        
                        if any(indicator in response_text for indicator in error_indicators):
                            issues.append({
                                'type': 'SQL Injection',
                                'severity': 'high',
                                'url': url,
                                'method': method,
                                'field': form_field,
                                'payload': payload,
                                'response_status': response.status_code,
                                'evidence': [indicator for indicator in error_indicators if indicator in response_text],
                                'timestamp': '2023-12-01T10:00:00Z'
                            })
                            break
                    except Exception:
                        continue
        
        return issues
    
    def test_xss(self, url: str, method: str = 'GET', params: Dict[str, str] = None) -> List[Dict[str, Any]]:
        """
        Test for Cross-Site Scripting vulnerabilities
        """
        issues = []
        
        # XSS payloads
        payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "<svg onload=alert('XSS')>",
            "javascript:alert('XSS')",
            "<iframe srcdoc='<script>alert(`XSS`)</script>'></iframe>",
            "<body onload=alert('XSS')>"
        ]
        
        # Test parameters
        for param_name in params or []:
            for payload in payloads:
                test_params = (params or {}).copy()
                test_params[param_name] = payload
                
                try:
                    response = self.session.get(url, params=test_params, timeout=10, verify=False)
                    
                    # Check if payload is reflected in response
                    if payload in response.text:
                        issues.append({
                            'type': 'Cross-Site Scripting',
                            'severity': 'high',
                            'url': url,
                            'method': method,
                            'parameter': param_name,
                            'payload': payload,
                            'response_status': response.status_code,
                            'reflected': True,
                            'timestamp': '2023-12-01T10:00:00Z'
                        })
                        break  # Stop testing this parameter if XSS found
                except Exception:
                    continue
        
        return issues
    
    def test_security_headers(self, url: str) -> List[Dict[str, Any]]:
        """
        Test for missing security headers
        """
        issues = []
        
        try:
            response = self.session.get(url, timeout=10, verify=False)
            headers = response.headers
            
            # Check for important security headers
            required_headers = [
                ('X-Frame-Options', 'Missing X-Frame-Options header (clickjacking protection)'),
                ('X-Content-Type-Options', 'Missing X-Content-Type-Options header (MIME-type sniffing protection)'),
                ('X-XSS-Protection', 'Missing X-XSS-Protection header (XSS filter)'),
                ('Strict-Transport-Security', 'Missing Strict-Transport-Security header (HSTS)'),
                ('Content-Security-Policy', 'Missing Content-Security-Policy header (CSP)')
            ]
            
            for header_name, description in required_headers:
                if header_name not in headers:
                    issues.append({
                        'type': 'Missing Security Header',
                        'severity': 'medium',
                        'url': url,
                        'header': header_name,
                        'description': description,
                        'timestamp': '2023-12-01T10:00:00Z'
                    })
        
        except Exception as e:
            issues.append({
                'type': 'Security Header Test Error',
                'severity': 'error',
                'url': url,
                'error': str(e),
                'timestamp': '2023-12-01T10:00:00Z'
            })
        
        return issues
    
    def test_directory_listing(self, url: str) -> List[Dict[str, Any]]:
        """
        Test for directory listing vulnerabilities
        """
        issues = []
        
        # Common directory names to test
        common_dirs = [
            'admin/', 'backup/', 'config/', 'logs/', 'tmp/', 'uploads/',
            'includes/', 'libraries/', 'vendor/', 'node_modules/'
        ]
        
        for directory in common_dirs:
            test_url = urljoin(url, directory)
            
            try:
                response = self.session.get(test_url, timeout=10, verify=False)
                
                # Check for directory listing indicators
                if response.status_code == 200:
                    response_text = response.text.lower()
                    indicators = [
                        'index of /', 'parent directory', 'directory listing',
                        '<title>index of', 'name size date'
                    ]
                    
                    if any(indicator in response_text for indicator in indicators):
                        issues.append({
                            'type': 'Directory Listing',
                            'severity': 'medium',
                            'url': test_url,
                            'description': 'Directory listing is enabled',
                            'timestamp': '2023-12-01T10:00:00Z'
                        })
            except Exception:
                continue
        
        return issues
    
    def run_comprehensive_scan(self) -> List[Dict[str, Any]]:
        """
        Run comprehensive DAST scan
        """
        print(f"Starting DAST scan of {self.base_url}")
        
        # Crawl the application
        print("Crawling application...")
        self.crawl_application()
        print(f"Discovered {len(self.links_found)} URLs and {len(self.forms_found)} forms")
        
        # Test all discovered URLs and forms
        all_issues = []
        
        # Test each URL for various vulnerabilities
        for url in self.links_found:
            print(f"Testing URL: {url}")
            
            # Test security headers
            header_issues = self.test_security_headers(url)
            all_issues.extend(header_issues)
            
            # Test directory listing
            dir_issues = self.test_directory_listing(url)
            all_issues.extend(dir_issues)
        
        # Test forms for injection vulnerabilities
        for form in self.forms_found:
            print(f"Testing form: {form['url']} ({form['method']})")
            
            # Test for SQL injection
            sql_issues = self.test_sql_injection(
                form['url'], 
                form['method'], 
                {field: 'test_value' for field in form['fields']}
            )
            all_issues.extend(sql_issues)
            
            # Test for XSS
            xss_issues = self.test_xss(
                form['url'], 
                form['method'], 
                {field: 'test_value' for field in form['fields']}
            )
            all_issues.extend(xss_issues)
        
        self.security_issues = all_issues
        return all_issues
    
    def generate_report(self) -> Dict[str, Any]:
        """
        Generate DAST scan report
        """
        report = {
            'scan_timestamp': '2023-12-01T10:00:00Z',
            'target_url': self.base_url,
            'urls_scanned': len(self.links_found),
            'forms_scanned': len(self.forms_found),
            'total_issues': len(self.security_issues),
            'issues_by_severity': {
                'critical': len([i for i in self.security_issues if i['severity'] == 'critical']),
                'high': len([i for i in self.security_issues if i['severity'] == 'high']),
                'medium': len([i for i in self.security_issues if i['severity'] == 'medium']),
                'low': len([i for i in self.security_issues if i['severity'] == 'low']),
                'error': len([i for i in self.security_issues if i['severity'] == 'error'])
            },
            'issues_by_type': {},
            'detailed_issues': self.security_issues
        }
        
        # Count issues by type
        for issue in self.security_issues:
            issue_type = issue['type']
            if issue_type not in report['issues_by_type']:
                report['issues_by_type'][issue_type] = 0
            report['issues_by_type'][issue_type] += 1
        
        return report

# Example usage
def run_dast_scan_example():
    """
    Example of running DAST scan
    """
    # Initialize scanner
    scanner = DASTScanner("https://example.com")
    
    # Run comprehensive scan
    issues = scanner.run_comprehensive_scan()
    
    # Generate report
    report = scanner.generate_report()
    
    # Print summary
    print(f"\nDAST Scan Results:")
    print(f"URLs scanned: {report['urls_scanned']}")
    print(f"Forms scanned: {report['forms_scanned']}")
    print(f"Total issues found: {report['total_issues']}")
    print(f"High severity: {report['issues_by_severity']['high']}")
    print(f"Medium severity: {report['issues_by_severity']['medium']}")
    
    # Output detailed report
    import json
    with open('dast-report.json', 'w') as f:
        json.dump(report, f, indent=2)
    
    return report

# Run example (commented out to prevent actual scanning)
# report = run_dast_scan_example()

Interactive Application Security Testing (IAST)

IAST Implementation and Integration

PYTHON
# Example: IAST agent implementation
import sys
import traceback
from typing import Dict, Any, List, Optional
import json
import threading
import time
from datetime import datetime
import inspect

class IASTAgent:
    """
    Interactive Application Security Testing agent
    """
    
    def __init__(self):
        self.security_events = []
        self.taint_tracking_enabled = True
        self.vulnerability_detectors = {
            'sql_injection': self.detect_sql_injection,
            'xss': self.detect_xss,
            'path_traversal': self.detect_path_traversal,
            'command_injection': self.detect_command_injection
        }
        self.tracked_variables = {}
        self.session_id = self.generate_session_id()
    
    def generate_session_id(self) -> str:
        """
        Generate unique session identifier
        """
        import uuid
        return str(uuid.uuid4())
    
    def instrument_method(self, method):
        """
        Decorator to instrument methods for security monitoring
        """
        def wrapper(*args, **kwargs):
            # Track method entry
            method_info = {
                'method_name': method.__name__,
                'class_name': method.__qualname__.split('.')[0] if '.' in method.__qualname__ else 'module',
                'args': str(args)[:1000],  # Limit length
                'kwargs': str(kwargs)[:1000],  # Limit length
                'timestamp': datetime.utcnow().isoformat(),
                'thread_id': threading.current_thread().ident
            }
            
            # Mark arguments as potentially tainted
            for i, arg in enumerate(args):
                if self.is_user_input(arg):
                    self.mark_as_tainted(arg, f'arg_{i}', method_info)
            
            for key, value in kwargs.items():
                if self.is_user_input(value):
                    self.mark_as_tainted(value, f'kwarg_{key}', method_info)
            
            try:
                result = method(*args, **kwargs)
                
                # Check for vulnerabilities after method execution
                self.check_for_vulnerabilities(method_info, args, kwargs, result)
                
                return result
            except Exception as e:
                # Log exceptions for security analysis
                self.log_security_event('exception_occurred', {
                    'method': method.__name__,
                    'exception': str(e),
                    'traceback': traceback.format_exc(),
                    'timestamp': datetime.utcnow().isoformat()
                })
                raise
        
        # Preserve original method attributes
        wrapper.__name__ = method.__name__
        wrapper.__doc__ = method.__doc__
        return wrapper
    
    def is_user_input(self, value) -> bool:
        """
        Determine if a value comes from user input
        """
        if value is None:
            return False
        
        # Check if it's a string that could be user input
        if isinstance(value, str):
            # Common indicators of user input
            return len(value) > 0 and not value.startswith('__')  # Not a system value
        
        # Check if it's from common user input sources
        str_val = str(value)
        return any(source in str_val.lower() for source in [
            'user_', 'input_', 'request_', 'param_', 'query_'
        ])
    
    def mark_as_tainted(self, value, label: str, context: Dict[str, Any]):
        """
        Mark a value as potentially tainted
        """
        value_id = id(value)
        self.tracked_variables[value_id] = {
            'value': value,
            'label': label,
            'context': context,
            'tainted': True,
            'sources': [context.get('method_name', 'unknown')]
        }
    
    def is_tainted(self, value) -> bool:
        """
        Check if a value is marked as tainted
        """
        value_id = id(value)
        return value_id in self.tracked_variables and self.tracked_variables[value_id]['tainted']
    
    def check_for_vulnerabilities(self, method_info: Dict[str, Any], args, kwargs, result):
        """
        Check for vulnerabilities in method execution
        """
        # Check each vulnerability type
        for vuln_type, detector in self.vulnerability_detectors.items():
            try:
                vulnerabilities = detector(method_info, args, kwargs, result)
                for vuln in vulnerabilities:
                    self.log_security_event(vuln_type, vuln)
            except Exception as e:
                self.log_security_event('detector_error', {
                    'detector': vuln_type,
                    'error': str(e),
                    'method': method_info['method_name']
                })
    
    def detect_sql_injection(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
        """
        Detect potential SQL injection vulnerabilities
        """
        vulnerabilities = []
        
        # Check if method name suggests database operation
        method_name = method_info['method_name'].lower()
        if any(keyword in method_name for keyword in ['query', 'execute', 'select', 'insert', 'update', 'delete']):
            # Check arguments for tainted values
            for i, arg in enumerate(args):
                if self.is_tainted(arg) and isinstance(arg, str):
                    if any(sql_keyword in arg.upper() for sql_keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'UNION']):
                        vulnerabilities.append({
                            'type': 'SQL Injection',
                            'severity': 'high',
                            'method': method_info['method_name'],
                            'argument_index': i,
                            'tainted_value': str(arg)[:200],  # Limit length
                            'context': method_info,
                            'timestamp': datetime.utcnow().isoformat()
                        })
        
        return vulnerabilities
    
    def detect_xss(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
        """
        Detect potential XSS vulnerabilities
        """
        vulnerabilities = []
        
        # Check if method name suggests output/rendering
        method_name = method_info['method_name'].lower()
        if any(keyword in method_name for keyword in ['render', 'display', 'show', 'output', 'write', 'response']):
            # Check arguments for tainted values that might contain scripts
            for i, arg in enumerate(args):
                if self.is_tainted(arg) and isinstance(arg, str):
                    if any(script_tag in arg.lower() for script_tag in ['<script', 'javascript:', 'onerror=', 'onclick=']):
                        vulnerabilities.append({
                            'type': 'Cross-Site Scripting',
                            'severity': 'high',
                            'method': method_info['method_name'],
                            'argument_index': i,
                            'tainted_value': str(arg)[:200],  # Limit length
                            'context': method_info,
                            'timestamp': datetime.utcnow().isoformat()
                        })
        
        return vulnerabilities
    
    def detect_path_traversal(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
        """
        Detect potential path traversal vulnerabilities
        """
        vulnerabilities = []
        
        # Check if method name suggests file operations
        method_name = method_info['method_name'].lower()
        if any(keyword in method_name for keyword in ['file', 'read', 'write', 'open', 'download', 'upload', 'include']):
            # Check arguments for tainted values containing path traversal
            for i, arg in enumerate(args):
                if self.is_tainted(arg) and isinstance(arg, str):
                    if '..' in arg or '../' in arg or '..\\' in arg:
                        vulnerabilities.append({
                            'type': 'Path Traversal',
                            'severity': 'high',
                            'method': method_info['method_name'],
                            'argument_index': i,
                            'tainted_value': str(arg)[:200],  # Limit length
                            'context': method_info,
                            'timestamp': datetime.utcnow().isoformat()
                        })
        
        return vulnerabilities
    
    def detect_command_injection(self, method_info: Dict[str, Any], args, kwargs, result) -> List[Dict[str, Any]]:
        """
        Detect potential command injection vulnerabilities
        """
        vulnerabilities = []
        
        # Check if method name suggests system/command execution
        method_name = method_info['method_name'].lower()
        if any(keyword in method_name for keyword in ['exec', 'system', 'shell', 'command', 'run', 'subprocess']):
            # Check arguments for tainted values
            for i, arg in enumerate(args):
                if self.is_tainted(arg) and isinstance(arg, str):
                    if any(cmd_char in arg for cmd_char in [';', '|', '&', '`', '$(', ')']):
                        vulnerabilities.append({
                            'type': 'Command Injection',
                            'severity': 'critical',
                            'method': method_info['method_name'],
                            'argument_index': i,
                            'tainted_value': str(arg)[:200],  # Limit length
                            'context': method_info,
                            'timestamp': datetime.utcnow().isoformat()
                        })
        
        return vulnerabilities
    
    def log_security_event(self, event_type: str, details: Dict[str, Any]):
        """
        Log a security event
        """
        event = {
            'event_type': event_type,
            'details': details,
            'session_id': self.session_id,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        self.security_events.append(event)
        
        # Print to console for immediate visibility (in real implementation, this might go to a security SIEM)
        print(f"SECURITY EVENT [{event_type}]: {details}")
    
    def get_security_events(self) -> List[Dict[str, Any]]:
        """
        Get all collected security events
        """
        return self.security_events
    
    def generate_report(self) -> Dict[str, Any]:
        """
        Generate IAST report
        """
        events = self.security_events
        
        report = {
            'session_id': self.session_id,
            'report_timestamp': datetime.utcnow().isoformat(),
            'total_events': len(events),
            'events_by_type': {},
            'events_by_severity': {
                'critical': len([e for e in events if e['details'].get('severity') == 'critical']),
                'high': len([e for e in events if e['details'].get('severity') == 'high']),
                'medium': len([e for e in events if e['details'].get('severity') == 'medium']),
                'low': len([e for e in events if e['details'].get('severity') == 'low'])
            },
            'detailed_events': events
        }
        
        # Count events by type
        for event in events:
            event_type = event['event_type']
            if event_type not in report['events_by_type']:
                report['events_by_type'][event_type] = 0
            report['events_by_type'][event_type] += 1
        
        return report

# Example application code with IAST instrumentation
class UserService:
    """
    Example service class to demonstrate IAST instrumentation
    """
    
    def __init__(self, iast_agent: IASTAgent):
        self.iast_agent = iast_agent
    
    @iast_agent.instrument_method
    def get_user_by_id(self, user_id: str):
        """
        Get user by ID (potentially vulnerable to SQL injection)
        """
        # Simulate database query - this would be vulnerable if user_id is not sanitized
        query = f"SELECT * FROM users WHERE id = {user_id}"
        print(f"Executing query: {query}")
        return {"id": user_id, "name": "John Doe"}
    
    @iast_agent.instrument_method
    def display_user_profile(self, user_data: Dict[str, str]):
        """
        Display user profile (potentially vulnerable to XSS)
        """
        # Simulate HTML output - this would be vulnerable to XSS
        html_output = f"<div>User: {user_data.get('name', '')}</div>"
        print(f"Displaying: {html_output}")
        return html_output
    
    @iast_agent.instrument_method
    def read_user_file(self, filename: str):
        """
        Read user file (potentially vulnerable to path traversal)
        """
        # Simulate file reading - this would be vulnerable to path traversal
        safe_path = f"/safe/user/files/{filename}"
        print(f"Reading file: {safe_path}")
        return "File contents"

# Example usage
def run_iast_example():
    """
    Example of running IAST instrumentation
    """
    # Initialize IAST agent
    iast_agent = IASTAgent()
    
    # Create instrumented service
    user_service = UserService(iast_agent)
    
    # Simulate vulnerable operations
    print("Testing IAST with simulated user inputs...")
    
    # This should trigger SQL injection detection
    user_service.get_user_by_id("1 OR 1=1")  # Malicious input
    
    # This should trigger XSS detection
    user_service.display_user_profile({"name": "<script>alert('XSS')</script>"})  # Malicious input
    
    # This should trigger path traversal detection
    user_service.read_user_file("../../etc/passwd")  # Malicious input
    
    # Generate and print report
    report = iast_agent.generate_report()
    
    print(f"\nIAST Report:")
    print(f"Total events: {report['total_events']}")
    print(f"Critical: {report['events_by_severity']['critical']}")
    print(f"High: {report['events_by_severity']['high']}")
    print(f"Events by type: {report['events_by_type']}")
    
    # Output detailed report
    with open('iast-report.json', 'w') as f:
        json.dump(report, f, indent=2)
    
    return report

# Run example
# report = run_iast_example()

Security Testing Automation

Automated Security Testing Pipeline

Security Testing Orchestration

YAML
# Example: Security testing pipeline configuration
name: Security Testing Pipeline
on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  security-static-analysis:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      
      - name: Install dependencies
        run: |
          pip install bandit flake8 safety
          
      - name: Run Bandit security scan
        run: |
          bandit -r . -f json -o bandit-results.json
          
      - name: Run Safety dependency check
        run: |
          safety check -r requirements.txt --json > safety-results.json
          
      - name: Run custom SAST scan
        run: |
          python security_tools/custom_sast_scanner.py --path . --output sast-results.json
          
      - name: Analyze results
        run: |
          python security_tools/result_analyzer.py --bandit bandit-results.json --safety safety-results.json --sast sast-results.json
          
      - name: Upload security results
        uses: actions/upload-artifact@v3
        with:
          name: security-static-results
          path: |
            bandit-results.json
            safety-results.json
            sast-results.json

  security-dynamic-analysis:
    runs-on: ubuntu-latest
    needs: security-static-analysis
    steps:
      - uses: actions/checkout@v3
      
      - name: Deploy to test environment
        run: |
          # Deploy application to temporary test environment
          docker-compose -f docker-compose.test.yml up -d
          sleep 30  # Wait for application to start
          
      - name: Run ZAP baseline scan
        run: |
          docker run -t owasp/zap2docker-stable zap-baseline.py -t http://localhost:8080 -g gen.conf
          
      - name: Run custom DAST scan
        run: |
          python security_tools/custom_dast_scanner.py --target http://localhost:8080 --output dast-results.json
          
      - name: Upload DAST results
        uses: actions/upload-artifact@v3
        with:
          name: security-dynamic-results
          path: dast-results.json
          
      - name: Tear down test environment
        if: always()
        run: |
          docker-compose -f docker-compose.test.yml down

  security-composition-analysis:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Run Snyk security scan
        uses: snyk/actions/python@master
        env:
          SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
        with:
          args: --file=requirements.txt --json > snyk-results.json
          
      - name: Run Dependency-Check
        uses: dependency-check/Dependency-Check_Action@main
        id: depcheck
        with:
          project: 'MyProject'
          path: '.'
          format: 'JSON'
          out: 'reports'
          
      - name: Upload composition analysis results
        uses: actions/upload-artifact@v3
        with:
          name: security-composition-results
          path: |
            snyk-results.json
            reports/

  security-policy-check:
    runs-on: ubuntu-latest
    needs: [security-static-analysis, security-dynamic-analysis, security-composition-analysis]
    steps:
      - name: Download all security results
        uses: actions/download-artifact@v3
        with:
          path: security-results/
          
      - name: Check security policies
        run: |
          python security_tools/policy_checker.py --results-dir security-results/ --policy security-policy.json
          
      - name: Generate security dashboard
        run: |
          python security_tools/dashboard_generator.py --results-dir security-results/ --output security-dashboard.html
          
      - name: Upload security dashboard
        uses: actions/upload-artifact@v3
        with:
          name: security-dashboard
          path: security-dashboard.html
          
      - name: Post security summary to PR
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v6
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(fs.readFileSync('security-results/security-static-results/bandit-results.json', 'utf8'));
            const comment = `Security Scan Results:\n\nCritical: ${results.metrics._totals.high} high severity issues found.`;
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: comment
            });

Security Testing Quality Gates

Quality Gate Implementation

PYTHON
# Example: Security testing quality gates
import json
from typing import Dict, Any, List
from dataclasses import dataclass

@dataclass
class QualityGateResult:
    passed: bool
    violations: List[Dict[str, Any]]
    summary: Dict[str, int]

class SecurityQualityGate:
    """
    Security quality gates for CI/CD pipelines
    """
    
    def __init__(self, policy_config: Dict[str, Any]):
        self.policy_config = policy_config
    
    def check_static_analysis_gate(self, scan_results: Dict[str, Any]) -> QualityGateResult:
        """
        Check static analysis quality gate
        """
        violations = []
        critical_count = 0
        high_count = 0
        
        # Check for critical and high severity issues
        for finding in scan_results.get('results', []):
            severity = finding.get('severity', 'medium').lower()
            
            if severity == 'critical':
                critical_count += 1
                violations.append({
                    'type': 'critical_finding',
                    'severity': 'critical',
                    'location': finding.get('location', 'unknown'),
                    'description': finding.get('description', 'unknown')
                })
            elif severity == 'high':
                high_count += 1
                violations.append({
                    'type': 'high_finding',
                    'severity': 'high',
                    'location': finding.get('location', 'unknown'),
                    'description': finding.get('description', 'unknown')
                })
        
        # Check against policy thresholds
        max_critical = self.policy_config.get('max_critical_vulnerabilities', 0)
        max_high = self.policy_config.get('max_high_vulnerabilities', 5)
        
        if critical_count > max_critical:
            violations.append({
                'type': 'policy_violation',
                'severity': 'critical',
                'description': f'Too many critical vulnerabilities: {critical_count} > {max_critical}'
            })
        
        if high_count > max_high:
            violations.append({
                'type': 'policy_violation',
                'severity': 'high',
                'description': f'Too many high vulnerabilities: {high_count} > {max_high}'
            })
        
        passed = critical_count <= max_critical and high_count <= max_high
        
        summary = {
            'critical_vulnerabilities': critical_count,
            'high_vulnerabilities': high_count,
            'quality_gate_passed': passed
        }
        
        return QualityGateResult(passed=passed, violations=violations, summary=summary)
    
    def check_dependency_analysis_gate(self, scan_results: Dict[str, Any]) -> QualityGateResult:
        """
        Check dependency analysis quality gate
        """
        violations = []
        critical_count = 0
        high_count = 0
        
        # Parse dependency scan results
        vulnerabilities = scan_results.get('vulnerabilities', [])
        
        for vuln in vulnerabilities:
            severity = vuln.get('severity', 'medium').lower()
            
            if severity == 'critical':
                critical_count += 1
                violations.append({
                    'type': 'critical_dependency_vulnerability',
                    'severity': 'critical',
                    'package': vuln.get('package', 'unknown'),
                    'cve': vuln.get('id', 'unknown'),
                    'description': vuln.get('title', 'unknown')
                })
            elif severity == 'high':
                high_count += 1
                violations.append({
                    'type': 'high_dependency_vulnerability',
                    'severity': 'high',
                    'package': vuln.get('package', 'unknown'),
                    'cve': vuln.get('id', 'unknown'),
                    'description': vuln.get('title', 'unknown')
                })
        
        # Check against policy thresholds
        max_critical_deps = self.policy_config.get('max_critical_dependency_vulnerabilities', 0)
        max_high_deps = self.policy_config.get('max_high_dependency_vulnerabilities', 3)
        
        if critical_count > max_critical_deps:
            violations.append({
                'type': 'policy_violation',
                'severity': 'critical',
                'description': f'Too many critical dependency vulnerabilities: {critical_count} > {max_critical_deps}'
            })
        
        if high_count > max_high_deps:
            violations.append({
                'type': 'policy_violation',
                'severity': 'high',
                'description': f'Too many high dependency vulnerabilities: {high_count} > {max_high_deps}'
            })
        
        passed = critical_count <= max_critical_deps and high_count <= max_high_deps
        
        summary = {
            'critical_dependency_vulnerabilities': critical_count,
            'high_dependency_vulnerabilities': high_count,
            'quality_gate_passed': passed
        }
        
        return QualityGateResult(passed=passed, violations=violations, summary=summary)
    
    def check_dynamic_analysis_gate(self, scan_results: Dict[str, Any]) -> QualityGateResult:
        """
        Check dynamic analysis quality gate
        """
        violations = []
        critical_count = 0
        high_count = 0
        
        # Parse dynamic scan results
        issues = scan_results.get('issues', [])
        
        for issue in issues:
            severity = issue.get('severity', 'medium').lower()
            
            if severity == 'critical':
                critical_count += 1
                violations.append({
                    'type': 'critical_runtime_vulnerability',
                    'severity': 'critical',
                    'url': issue.get('url', 'unknown'),
                    'description': issue.get('description', 'unknown')
                })
            elif severity == 'high':
                high_count += 1
                violations.append({
                    'type': 'high_runtime_vulnerability',
                    'severity': 'high',
                    'url': issue.get('url', 'unknown'),
                    'description': issue.get('description', 'unknown')
                })
        
        # Check against policy thresholds
        max_critical_dynamic = self.policy_config.get('max_critical_dynamic_vulnerabilities', 0)
        max_high_dynamic = self.policy_config.get('max_high_dynamic_vulnerabilities', 2)
        
        if critical_count > max_critical_dynamic:
            violations.append({
                'type': 'policy_violation',
                'severity': 'critical',
                'description': f'Too many critical dynamic vulnerabilities: {critical_count} > {max_critical_dynamic}'
            })
        
        if high_count > max_high_dynamic:
            violations.append({
                'type': 'policy_violation',
                'severity': 'high',
                'description': f'Too many high dynamic vulnerabilities: {high_count} > {max_high_dynamic}'
            })
        
        passed = critical_count <= max_critical_dynamic and high_count <= max_high_dynamic
        
        summary = {
            'critical_dynamic_vulnerabilities': critical_count,
            'high_dynamic_vulnerabilities': high_count,
            'quality_gate_passed': passed
        }
        
        return QualityGateResult(passed=passed, violations=violations, summary=summary)
    
    def run_all_quality_gates(self, scan_results: Dict[str, Any]) -> Dict[str, QualityGateResult]:
        """
        Run all quality gates and return results
        """
        results = {}
        
        if 'static_analysis' in scan_results:
            results['static_analysis'] = self.check_static_analysis_gate(scan_results['static_analysis'])
        
        if 'dependency_analysis' in scan_results:
            results['dependency_analysis'] = self.check_dependency_analysis_gate(scan_results['dependency_analysis'])
        
        if 'dynamic_analysis' in scan_results:
            results['dynamic_analysis'] = self.check_dynamic_analysis_gate(scan_results['dynamic_analysis'])
        
        return results
    
    def overall_quality_gate_passed(self, gate_results: Dict[str, QualityGateResult]) -> bool:
        """
        Check if overall quality gate passes (all individual gates must pass)
        """
        return all(result.passed for result in gate_results.values())

# Example usage
def run_security_quality_gates():
    """
    Example of running security quality gates
    """
    # Define security policy
    security_policy = {
        'max_critical_vulnerabilities': 0,
        'max_high_vulnerabilities': 5,
        'max_critical_dependency_vulnerabilities': 0,
        'max_high_dependency_vulnerabilities': 3,
        'max_critical_dynamic_vulnerabilities': 0,
        'max_high_dynamic_vulnerabilities': 2
    }
    
    # Initialize quality gate checker
    quality_gate = SecurityQualityGate(security_policy)
    
    # Example scan results (in real implementation, these would come from actual scanners)
    scan_results = {
        'static_analysis': {
            'results': [
                {
                    'severity': 'high',
                    'location': 'app/models/user.py:45',
                    'description': 'SQL injection vulnerability'
                },
                {
                    'severity': 'medium',
                    'location': 'app/views/auth.py:123',
                    'description': 'Weak password policy'
                }
            ]
        },
        'dependency_analysis': {
            'vulnerabilities': [
                {
                    'severity': 'high',
                    'package': 'django',
                    'id': 'CVE-2023-1234',
                    'title': 'Django security issue'
                }
            ]
        },
        'dynamic_analysis': {
            'issues': [
                {
                    'severity': 'medium',
                    'url': 'https://test.example.com/login',
                    'description': 'Missing security headers'
                }
            ]
        }
    }
    
    # Run all quality gates
    gate_results = quality_gate.run_all_quality_gates(scan_results)
    
    # Check overall result
    overall_passed = quality_gate.overall_quality_gate_passed(gate_results)
    
    print(f"Overall quality gate passed: {overall_passed}")
    
    # Print individual results
    for gate_name, result in gate_results.items():
        print(f"{gate_name}: {'PASS' if result.passed else 'FAIL'}")
        print(f"  Summary: {result

You might also like

Browse all articles
Series

Virtual Networking with VMware

Comprehensive guide to VMware virtual networking, including vSwitches, port groups, VLANs, and network configuration best practices.

#VMware#Networking#vSwitch
Series

vCenter Server and Centralized Management

Complete guide to VMware vCenter Server and centralized management, covering installation, configuration, and management of VMware environments.

#VMware#vCenter Server#Centralized Management
Series

Storage Virtualization with VMware

Complete guide to VMware storage virtualization, including datastore types, storage protocols, and storage management strategies.

#VMware#Storage#Datastore
Series

Security Best Practices in VMware Environments

Comprehensive guide to security best practices in VMware environments, covering ESXi hardening, vCenter security, network security, and compliance.

#VMware#Security#Hardening