Threat Enforcement Implementation
This section implements the core threat enforcement system that automatically applies graduated security responses based on threat intelligence data. The system fetches threat data from our Intelligence Collector, validates it, categorizes threats by severity, and applies appropriate enforcement actions through Cloudflare's IP Lists.
System Architecture Flow
The enforcement system follows this logical flow:
- Data Validation - Ensures threat data integrity and filters invalid entries
- Response Logic - Categorizes threats into graduated response levels (Block, Challenge, Rate Limit)
- IP List Management - Creates and manages Cloudflare IP Lists for each response level
- Real-time Enforcement - Updates Cloudflare rules to apply security policies
- Statistics & Monitoring - Tracks enforcement effectiveness and maintains audit trails
Validate Threat Data
Purpose: This validation layer ensures data integrity and prevents enforcement failures due to malformed threat intelligence. It acts as a critical safety checkpoint before applying security policies.
Why it's essential: Invalid data could cause enforcement failures, allowing threats to slip through or blocking legitimate traffic. This validation prevents such scenarios by sanitizing all incoming threat data.
import { ThreatIP } from '../types';
// Validate and clean threat data from Intelligence Collector
export function validateThreatData(threats: any[]): ThreatIP[] {
const validThreats: ThreatIP[] = [];
let invalidCount = 0;
for (const threat of threats) {
try {
// Validate required fields
if (!threat.ip || typeof threat.ip !== 'string') {
throw new Error('Missing or invalid IP field');
}
if (typeof threat.score !== 'number' || threat.score < 0) {
throw new Error('Missing or invalid score field');
}
if (!Array.isArray(threat.sources)) {
throw new Error('Missing or invalid sources field');
}
if (typeof threat.is_whitelisted !== 'boolean') {
throw new Error('Missing or invalid is_whitelisted field');
}
// Validate IP address format (basic IPv4 check)
if (!isValidIPv4(threat.ip)) {
throw new Error(`Invalid IPv4 address: ${threat.ip}`);
}
// Create clean threat object
const cleanThreat: ThreatIP = {
ip: threat.ip.trim(),
score: Math.max(0, Math.floor(threat.score)), // Ensure non-negative integer
sources: threat.sources.filter((s: string) => typeof s === 'string' && s.trim().length > 0),
first_seen: threat.first_seen || new Date().toISOString(),
last_seen: threat.last_seen || new Date().toISOString(),
is_whitelisted: Boolean(threat.is_whitelisted)
};
validThreats.push(cleanThreat);
} catch (err) {
const error = err as Error;
console.warn(`Invalid threat data skipped:`, error.message, threat);
invalidCount++;
}
}
if (invalidCount > 0) {
console.log(`Filtered out ${invalidCount} invalid threat entries`);
}
console.log(`Validated ${validThreats.length} clean threat entries`);
return validThreats;
}
// Basic IPv4 validation
function isValidIPv4(ip: string): boolean {
const ipv4Regex = /^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$/;
return ipv4Regex.test(ip);
}
IP List Management
Purpose: This module manages the lifecycle of Cloudflare IP Lists, which are the actual enforcement mechanisms that control traffic access. It handles initialization, storage, and retrieval of list identifiers.
Key responsibilities:
- One-time Setup: Creates the required IP Lists in Cloudflare (Block, Challenge, Rate Limit)
- Persistent Storage: Saves list IDs in KV storage for future enforcement cycles
- Error Recovery: Handles cases where lists already exist or need re-initialization
Why separation matters: By separating list management from enforcement logic, we ensure consistent list handling and prevent duplicate list creation across multiple worker invocations.
import { Env, ResponseLevel } from '../types';
import { CloudflareIPLists } from './cloudflare-ip-lists';
// Initialize Cloudflare IP Lists on first run
export async function initializeIPLists(env: Env): Promise<Partial<Record<ResponseLevel, string>>> {
console.log('Initializing Cloudflare IP Lists...');
const ipLists = new CloudflareIPLists(env.CF_API_TOKEN, env.CF_ACCOUNT_ID);
// Test API connection first
const connected = await ipLists.testConnection();
if (!connected) {
throw new Error('Failed to connect to Cloudflare API. Check your API token and account ID.');
}
// Create or find existing lists
const listIds = await ipLists.createEnforcementLists();
// Store list IDs in KV for future use
for (const [level, listId] of Object.entries(listIds)) {
await env.ENFORCEMENT_DATA.put(`list_ids:${level}`, listId);
console.log(`Stored list ID for ${level}: ${listId}`);
}
return listIds;
}
// Get stored list IDs or initialize if they don't exist
export async function getIPListIds(env: Env): Promise<Partial<Record<ResponseLevel, string>>> {
const listIds: Partial<Record<ResponseLevel, string>> = {};
const levels = [ResponseLevel.BLOCK, ResponseLevel.CHALLENGE, ResponseLevel.RATE_LIMIT];
let needsInitialization = false;
for (const level of levels) {
const listId = await env.ENFORCEMENT_DATA.get(`list_ids:${level}`);
if (listId) {
listIds[level] = listId;
} else {
needsInitialization = true;
break;
}
}
if (needsInitialization) {
console.log('List IDs not found, initializing...');
return await initializeIPLists(env);
}
return listIds;
}
Cloudflare API Integration
Purpose: This is the core integration layer that communicates directly with Cloudflare's API to manage IP Lists. It handles the complex aspects of API communication, batch operations, and error handling.
Critical functionality:
- List Creation: Automatically creates IP Lists for each enforcement level with descriptive names
- Batch Updates: Efficiently handles large IP sets by processing them in API-compliant batches (1000 IPs per request)
- List Replacement: Uses a clear-then-add strategy to ensure clean state updates
- Rate Limiting: Includes delays between batch operations to respect Cloudflare's API rate limits
- Error Recovery: Gracefully handles duplicate list creation and API failures
Why this approach: Rather than adding/removing individual IPs, we replace entire lists to maintain consistency and avoid state drift between enforcement cycles.
import Cloudflare from 'cloudflare';
import { ResponseLevel } from '../types';
export class CloudflareIPLists {
private client: Cloudflare;
private accountId: string;
constructor(apiToken: string, accountId: string) {
this.client = new Cloudflare({
apiToken: apiToken
});
this.accountId = accountId;
}
// Create IP lists for each enforcement level
async createEnforcementLists(): Promise<Partial<Record<ResponseLevel, string>>> {
const lists: Partial<Record<ResponseLevel, string>> = {};
// Create lists for active enforcement levels (excluding LOG_ONLY)
const levelsToCreate = [ResponseLevel.BLOCK, ResponseLevel.CHALLENGE, ResponseLevel.RATE_LIMIT];
for (const level of levelsToCreate) {
try {
console.log(`Creating IP list for ${level}...`);
const response = await this.client.rules.lists.create({
account_id: this.accountId,
name: `threat_${level}`,
description: `IPs requiring ${level} enforcement - Auto-managed by Enforcement Engine`,
kind: 'ip'
});
lists[level] = response.id;
console.log(`✓ Created list '${response.name}' with ID: ${response.id}`);
} catch (err) {
const error = err as Error;
// Handle case where list might already exist
if (error.message.includes('already exists') || error.message.includes('duplicate')) {
console.log(`List for ${level} already exists, finding existing ID...`);
const existingId = await this.findListByName(`threat_${level}`);
if (existingId) {
lists[level] = existingId;
console.log(`✓ Found existing list for ${level}: ${existingId}`);
}
} else {
console.error(`Failed to create list for ${level}:`, error);
throw error;
}
}
}
return lists;
}
// Find existing list by name
private async findListByName(name: string): Promise<string | null> {
try {
const lists = await this.client.rules.lists.list({
account_id: this.accountId
});
const found = lists.result.find(list => list.name === name);
return found ? found.id! : null;
} catch (error) {
console.error('Failed to find existing lists:', error);
return null;
}
}
// Update IP list with new threats (replaces all existing items)
async updateList(listId: string, ips: string[]): Promise<void> {
if (!listId || ips.length === 0) {
console.log('Skipping list update - no list ID or no IPs provided');
return;
}
try {
console.log(`Updating list ${listId} with ${ips.length} IPs...`);
// Step 1: Clear existing items
await this.clearList(listId);
// Step 2: Add new items in batches (Cloudflare limit: 1000 items per request)
const batches = this.chunkArray(ips, 1000);
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
console.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} items)...`);
const items = batch.map(ip => ({
ip: ip,
comment: `Auto-added ${new Date().toISOString()}`
}));
await this.client.rules.lists.items.create(listId, {
account_id: this.accountId,
body: items
});
console.log(`✓ Added batch ${i + 1} with ${batch.length} IPs`);
// Add small delay between batches to respect rate limits
if (i < batches.length - 1) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
console.log(`✓ Successfully updated list ${listId} with ${ips.length} total IPs`);
} catch (error) {
console.error(`Failed to update list ${listId}:`, error);
throw error;
}
}
// Clear all items from a list
private async clearList(listId: string): Promise<void> {
try {
console.log(`Clearing existing items from list ${listId}...`);
// Delete all items (this clears the entire list)
await this.client.rules.lists.items.delete(listId, {
account_id: this.accountId
});
console.log(`✓ Cleared list ${listId}`);
} catch (err) {
const error = err as Error;
// Don't throw on clear failures - list might already be empty
console.warn(`Warning: Failed to clear list ${listId}:`, error.message);
}
}
// Get current items in a list (for debugging)
async getListItems(listId: string): Promise<any[]> {
try {
const response = await this.client.rules.lists.items.list(listId, {
account_id: this.accountId
});
return response.result || [];
} catch (error) {
console.error(`Failed to get items from list ${listId}:`, error);
return [];
}
}
// Get list information
async getListInfo(listId: string): Promise<any> {
try {
const response = await this.client.rules.lists.get(listId, {
account_id: this.accountId
});
return response;
} catch (error) {
console.error(`Failed to get list info for ${listId}:`, error);
return null;
}
}
// Utility function to split array into chunks
private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
// Test API connectivity
async testConnection(): Promise<boolean> {
try {
console.log('Testing Cloudflare API connection...');
const response = await this.client.accounts.tokens.verify({ account_id: this.accountId });
console.log(`✓ API connection successful. Token status: ${response.status === "active" ? 'valid' : 'invalid'}`);
return response.status === "active";
} catch (error) {
console.error('✗ API connection failed:', error);
return false;
}
}
}
Enforcement Statistics
Purpose: This module maintains comprehensive statistics about enforcement activities, providing visibility into system performance and threat distribution patterns.
Key metrics tracked:
- Threat Distribution: Counts of IPs in each enforcement category (Block/Challenge/Rate Limit)
- Processing Statistics: Total threats processed and enforcement effectiveness
- Audit Trail: Timestamps and historical data for compliance and monitoring
- Response Mapping: Which specific IPs are assigned to which enforcement levels
Business value: These statistics enable security teams to understand threat patterns, measure enforcement effectiveness, and make data-driven decisions about security policies.
import { Env, EnforcementStats, ResponseLevel } from '../types';
// Get current enforcement statistics
export async function getEnforcementStats(env: Env): Promise<EnforcementStats> {
try {
const statsJson = await env.ENFORCEMENT_DATA.get('enforcement:stats');
if (statsJson) {
return JSON.parse(statsJson);
}
} catch (error) {
console.error('Failed to get enforcement stats:', error);
}
// Return default stats if none exist
return {
last_update: '',
total_threats: 0,
blocked_ips: 0,
challenged_ips: 0,
rate_limited_ips: 0,
enforcement_rules: {
[ResponseLevel.BLOCK]: [],
[ResponseLevel.CHALLENGE]: [],
[ResponseLevel.RATE_LIMIT]: [],
[ResponseLevel.LOG_ONLY]: []
}
};
}
// Update enforcement statistics after processing
export async function updateEnforcementStats(grouped: Record<string, string[]>, env: Env): Promise<void> {
const stats: EnforcementStats = {
last_update: new Date().toISOString(),
total_threats: Object.values(grouped).reduce((sum, ips) => sum + ips.length, 0),
blocked_ips: grouped[ResponseLevel.BLOCK]?.length || 0,
challenged_ips: grouped[ResponseLevel.CHALLENGE]?.length || 0,
rate_limited_ips: grouped[ResponseLevel.RATE_LIMIT]?.length || 0,
enforcement_rules: grouped as Record<ResponseLevel, string[]>
};
await env.ENFORCEMENT_DATA.put('enforcement:stats', JSON.stringify(stats));
console.log(`Updated enforcement stats: ${stats.total_threats} threats processed`);
}
Threat Data Caching
Purpose: Implements a resilient caching strategy to ensure enforcement continuity even when the Intelligence Collector is temporarily unavailable.
Resilience strategy:
- Successful Data Caching: Every successful threat data fetch is cached with metadata
- Fallback Capability: When the Intelligence Collector is unavailable, the system can fall back to cached data
- Cache Age Tracking: Monitors how old cached data is to make informed decisions about using stale data
- Graceful Degradation: Enables continued operation during service outages or network issues
Why caching matters: Security enforcement shouldn't stop because one service is down. This caching system ensures continuous protection even during temporary service disruptions.
import { Env, ThreatIP } from "../types";
// Cache successful threat data for fallback scenarios
export async function cacheThreats(threats: ThreatIP[], env: Env): Promise<void> {
try {
const cacheData = {
threats: threats,
cached_at: new Date().toISOString(),
count: threats.length
};
await env.ENFORCEMENT_DATA.put('cached_threats', JSON.stringify(cacheData.threats));
await env.ENFORCEMENT_DATA.put('cache_metadata', JSON.stringify({
cached_at: cacheData.cached_at,
count: cacheData.count
}));
console.log(`Cached ${threats.length} threats for fallback scenarios`);
} catch (error) {
console.warn('Failed to cache threat data:', error);
}
}
// Get cache age in minutes
export async function getCacheAge(env: Env): Promise<number> {
try {
const metadata = await env.ENFORCEMENT_DATA.get('cache_metadata');
if (metadata) {
const { cached_at } = JSON.parse(metadata);
const cacheTime = new Date(cached_at);
const now = new Date();
return Math.floor((now.getTime() - cacheTime.getTime()) / (1000 * 60));
}
} catch (error) {
console.warn('Failed to get cache age:', error);
}
return Infinity; // Very old cache
}
Graduated Response Logic
Purpose: This is the decision engine that implements our graduated security response strategy. It translates raw threat scores into actionable security policies based on well-defined business rules.
Decision matrix:
- Score ≥4 → BLOCK: High confidence threats from multiple sources get immediate denial
- Score 2-3 → CHALLENGE: Medium confidence threats require CAPTCHA verification
- Score 1 → RATE_LIMIT: Low confidence threats get throttled to limit potential impact
- Score 0 → LOG_ONLY: Clean IPs are allowed with logging for monitoring
- Whitelisted → LOG_ONLY: Safety override - whitelisted IPs are never blocked regardless of score
Strategic benefit: This graduated approach prevents over-blocking while ensuring appropriate response levels. It balances security effectiveness with user experience by applying proportional responses based on threat confidence.
import { ResponseLevel, EnforcementRule, ThreatIP } from '../types';
// Determine appropriate response based on threat score
export function determineResponse(threatScore: number, isWhitelisted: boolean): EnforcementRule {
// Whitelist always takes precedence (safety first)
if (isWhitelisted) {
return {
level: ResponseLevel.LOG_ONLY,
action: 'allow',
description: 'Whitelisted IP - no enforcement applied'
};
}
// Graduated response based on threat confidence
if (threatScore >= 4) {
return {
level: ResponseLevel.BLOCK,
action: 'deny',
description: 'High confidence threat from multiple sources - block immediately'
};
} else if (threatScore >= 2) {
return {
level: ResponseLevel.CHALLENGE,
action: 'challenge',
description: 'Medium confidence threat - require CAPTCHA verification'
};
} else if (threatScore >= 1) {
return {
level: ResponseLevel.RATE_LIMIT,
action: 'rate_limit',
description: 'Low confidence threat - rate limit to 10 requests/minute'
};
} else {
return {
level: ResponseLevel.LOG_ONLY,
action: 'allow',
description: 'Clean IP - allow with logging only'
};
}
}
// Group threat IPs by their appropriate response level
export function groupThreatsByResponse(threats: ThreatIP[]): Record<string, string[]> {
const grouped: Record<string, string[]> = {
[ResponseLevel.BLOCK]: [],
[ResponseLevel.CHALLENGE]: [],
[ResponseLevel.RATE_LIMIT]: []
};
for (const threat of threats) {
// Skip whitelisted IPs - they won't be enforced
if (threat.is_whitelisted) continue;
const response = determineResponse(threat.score, false);
// Only add IPs that require active enforcement
if (response.level !== ResponseLevel.LOG_ONLY) {
grouped[response.level].push(threat.ip);
}
}
return grouped;
}
Service Binding: Intelligence Collector
Purpose: This module implements the critical service-to-service communication layer between the Enforcement Engine and Intelligence Collector. It uses Cloudflare Service Bindings for secure, efficient, and reliable inter-worker communication without external network calls.
Service Binding Architecture:
- Zero-Latency Communication: Service bindings enable direct worker-to-worker communication within Cloudflare's edge network, eliminating network latency and external API calls
- Automatic Load Balancing: Cloudflare automatically routes requests to healthy Intelligence Collector instances
- Built-in Security: Service bindings provide secure communication channels without exposing services to the internet
- Resource Efficiency: Direct memory sharing between workers reduces computational overhead
Key Implementation Features:
- Paginated Data Retrieval: Handles large threat datasets efficiently by fetching data in pages rather than overwhelming single requests
- Comprehensive Error Handling: Implements multiple fallback strategies when the Intelligence Collector is unavailable
- Request Validation: Validates response structure and content type to ensure data integrity
- Performance Monitoring: Tracks request processing times and provides detailed logging for operational visibility
- Graceful Degradation: Maintains enforcement capabilities even during upstream service failures
Fallback Strategy Architecture:
- Primary: Direct service binding communication with real-time threat data
- Secondary: Cached threat data from previous successful runs (with age validation)
- Emergency: Hardcoded critical threat list for absolute worst-case scenarios
Production Benefits:
- High Availability: System continues operating even when Intelligence Collector experiences downtime
- Data Freshness: Prioritizes real-time data while maintaining backup options
- Audit Trail: Comprehensive logging of all communication attempts and fallback usage
- Monitoring Integration: Detailed metrics for service health and performance tracking
import { Env, ThreatIP, IntelligenceResponse } from '../types';
import { validateThreatData } from './validation';
// Fetch all threat data from Intelligence Collector Worker via Service Binding with pagination
export async function getThreatsFromCollector(env: Env): Promise<ThreatIP[]> {
if (!env.INTELLIGENCE_COLLECTOR) {
throw new Error('INTELLIGENCE_COLLECTOR service binding not configured');
}
console.log('Fetching threat data via Service Binding...');
try {
let allThreats: ThreatIP[] = [];
let currentPage = 1;
let hasNextPage = true;
let totalThreats = 0;
let totalPages = 0;
while (hasNextPage) {
// Create a new request for the /ips endpoint with pagination
const ipsRequest = new Request(`https://intelligence-collector/ips?page=${currentPage}`, {
method: 'GET',
headers: {
'User-Agent': 'Enforcement-Engine/1.0',
'Accept': 'application/json',
'X-Service': 'enforcement-engine'
}
});
console.log(`Fetching page ${currentPage}...`);
const response = await env.INTELLIGENCE_COLLECTOR.fetch(ipsRequest);
if (!response.ok) {
const errorText = await response.text().catch(() => 'Unknown error');
throw new Error(`Intelligence Collector returned ${response.status}: ${errorText}`);
}
const contentType = response.headers.get('content-type');
if (!contentType || !contentType.includes('application/json')) {
throw new Error(`Intelligence Collector returned non-JSON response: ${contentType}`);
}
const data: IntelligenceResponse = await response.json();
// Validate response structure
if (!data.success) {
throw new Error('Intelligence Collector returned unsuccessful response');
}
if (!Array.isArray(data.data)) {
throw new Error('Intelligence Collector response missing data array');
}
// Store pagination info from first page
if (currentPage === 1) {
totalThreats = data.pagination.total;
totalPages = Math.ceil(totalThreats / data.pagination.limit);
console.log(`Total threats: ${totalThreats}, Total pages: ${totalPages}`);
}
// Add current page threats to collection
allThreats.push(...data.data);
console.log(`✓ Fetched page ${currentPage} (${data.data.length} threats)`);
console.log(` Request ID: ${data.metadata.request_id}`);
console.log(` Processing time: ${data.metadata.processing_time_ms}ms`);
// Check if there are more pages
hasNextPage = data.pagination.has_next;
currentPage++;
}
console.log(`✓ Successfully fetched all ${allThreats.length} threats from Intelligence Collector`);
// Validate and clean threat data
const validThreats = validateThreatData(allThreats);
return validThreats;
} catch (err) {
const error = err as Error;
console.error('Failed to fetch threats from Intelligence Collector:', error);
// Implement fallback strategy
return await handleIntelligenceCollectorFailure(env, error);
}
}
// Handle Intelligence Collector failures with fallback strategies
export async function handleIntelligenceCollectorFailure(env: Env, error: Error): Promise<ThreatIP[]> {
console.error('Intelligence Collector communication failed, implementing fallback...');
// Strategy 1: Try to use cached data from previous successful run
try {
const cachedThreats = await env.ENFORCEMENT_DATA.get('cached_threats');
if (cachedThreats) {
const threats = JSON.parse(cachedThreats);
if (Array.isArray(threats) && threats.length > 0) {
console.log(`Using cached threat data: ${threats.length} threats`);
// Mark cache usage for monitoring
await env.ENFORCEMENT_DATA.put('last_fallback', JSON.stringify({
timestamp: new Date().toISOString(),
reason: error.message,
method: 'cached_data'
}));
return threats;
}
}
} catch (cacheError) {
console.warn('Failed to retrieve cached threat data:', cacheError);
}
// Strategy 2: Use emergency threat list (known bad actors)
console.log('No cached data available, using emergency threat list');
const emergencyThreats: ThreatIP[] = [
{
ip: '192.0.2.100', // RFC 5737 test IP
score: 5,
sources: ['emergency-list'],
first_seen: new Date().toISOString(),
last_seen: new Date().toISOString(),
is_whitelisted: false
}
];
// Mark emergency usage for monitoring
await env.ENFORCEMENT_DATA.put('last_fallback', JSON.stringify({
timestamp: new Date().toISOString(),
reason: error.message,
method: 'emergency_list'
}));
return emergencyThreats;
}
Main Enforcement Orchestration
Purpose: This is the central orchestration function that coordinates all enforcement activities. It represents the complete end-to-end enforcement workflow from data collection to rule application.
Complete workflow:
- Data Collection: Fetches fresh threat intelligence from the Intelligence Collector service
- Data Validation: Ensures all threat data meets quality standards before processing
- Response Categorization: Groups threats by their appropriate enforcement levels
- Infrastructure Management: Ensures Cloudflare IP Lists exist and are properly configured
- Rule Application: Updates live security rules with current threat data
- Audit & Statistics: Records enforcement actions and maintains performance metrics
- Error Handling: Implements comprehensive error recovery and logging
Production considerations:
- Atomic Operations: Either all enforcement succeeds or none, preventing partial state
- Graceful Degradation: Continues operating even when upstream services are unavailable
- Comprehensive Logging: Provides detailed audit trails for security operations
- Performance Monitoring: Tracks enforcement effectiveness and system health
import { Env, ResponseLevel } from '../types';
import { getThreatsFromCollector } from './intelligence-collector';
import { groupThreatsByResponse } from './response-logic';
import { getIPListIds } from './ip-list-management';
import { CloudflareIPLists } from './cloudflare-ip-lists';
import { cacheThreats } from './validation';
import { updateEnforcementStats } from './stats';
// Main enforcement function - now with real Cloudflare API integration
export async function enforceThreats(env: Env): Promise<void> {
console.log('Starting threat enforcement with Intelligence Collector integration...');
try {
// Step 1: Fetch threat data from Intelligence Collector
const threats = await getThreatsFromCollector(env);
if (threats.length === 0) {
console.log('No threats received - clearing all enforcement lists');
const listIds = await getIPListIds(env);
const ipLists = new CloudflareIPLists(env.CF_API_TOKEN, env.CF_ACCOUNT_ID);
// Clear all lists when no threats
for (const [level, listId] of Object.entries(listIds)) {
await ipLists.updateList(listId, []);
}
const { updateEnforcementStats } = await import('./stats');
await updateEnforcementStats({}, env);
return;
}
// Step 2: Cache successful data for fallback scenarios
await cacheThreats(threats, env);
// Step 3: Group threats by response level
const grouped = groupThreatsByResponse(threats);
console.log('Threat distribution by response level:');
Object.entries(grouped).forEach(([level, ips]) => {
console.log(` ${level}: ${ips.length} IPs`);
});
// Step 4: Apply enforcement via Cloudflare IP Lists
const listIds = await getIPListIds(env);
const ipLists = new CloudflareIPLists(env.CF_API_TOKEN, env.CF_ACCOUNT_ID);
for (const [level, ips] of Object.entries(grouped)) {
const listId = listIds[level as ResponseLevel];
if (listId) {
console.log(`Updating ${level} enforcement (${listId}): ${ips.length} IPs`);
await ipLists.updateList(listId, ips);
} else {
console.warn(`No list ID found for ${level} - skipping enforcement`);
}
}
// Step 5: Update statistics and audit trail
await updateEnforcementStats(grouped, env);
// Step 6: Record successful enforcement
await env.ENFORCEMENT_DATA.put('last_successful_enforcement', JSON.stringify({
timestamp: new Date().toISOString(),
threats_processed: threats.length,
service_binding: 'INTELLIGENCE_COLLECTOR',
enforcement_distribution: Object.fromEntries(
Object.entries(grouped).map(([level, ips]) => [level, ips.length])
)
}));
console.log('✓ Threat enforcement completed successfully');
} catch (err) {
const error = err as Error;
console.error('✗ Threat enforcement failed:', error);
// Record failure for monitoring
await env.ENFORCEMENT_DATA.put('last_enforcement_failure', JSON.stringify({
timestamp: new Date().toISOString(),
error: error.message,
service_binding: 'INTELLIGENCE_COLLECTOR'
}));
throw error;
}
}
Scheduled Execution Integration
Purpose: Integrates the enforcement system with Cloudflare Workers' scheduled event system to enable automatic, periodic threat enforcement.
Scheduling strategy:
- 30-minute intervals: Balances freshness with resource efficiency
- Error isolation: Scheduled failures don't affect API endpoints
- Comprehensive logging: All scheduled activities are logged for monitoring
- Graceful failure handling: Errors are logged but don't crash the worker
Production benefits: This scheduled approach ensures continuous protection without manual intervention, automatically adapting to new threats as they're identified by the Intelligence Collector.
import { Env } from './types';
import { enforceThreats } from './lib/enforcement';
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext) {
// ... other codes ...
},
// Scheduled function for automatic enforcement
async scheduled(event: ScheduledEvent, env: Env): Promise<void> {
console.log('Starting scheduled enforcement...');
try {
await enforceThreats(env);
console.log('Scheduled enforcement completed successfully');
} catch (error) {
console.error('Scheduled enforcement failed:', error);
}
}
};
Implementation Summary
This enforcement system demonstrates several key architectural patterns for production security automation:
Microservices Integration
- Clean separation between Intelligence Collection and Enforcement
- Service-to-service communication via Cloudflare Service Bindings
- Fault-tolerant design with graceful degradation
Graduated Security Response
- Data-driven decision making based on threat confidence scores
- Proportional responses that balance security with user experience
- Whitelist override capabilities for safety
Production-Ready Patterns
- Comprehensive input validation and error handling
- Atomic operations to prevent inconsistent states
- Detailed logging and audit trails for compliance
- Caching strategies for service resilience
Scalable Infrastructure Management
- Efficient batch processing for large IP sets
- API rate limiting and backoff strategies
- Automated resource provisioning and management
This system transforms raw threat intelligence into actionable security policies while maintaining the reliability and scalability required for production environments.