Automated Threat Intelligence Collection Scheduler
Learning Objectives
In this lab, you'll:
- Understand Cloudflare Workers cron triggers for automated scheduling
- Configure the intelligence collector to run at regular intervals
- Implement scheduled collection with proper error handling
- Test and monitor automated threat intelligence updates
- Learn scheduling best practices for production security systems
Architecture Overview
The scheduler orchestrates automated threat intelligence collection through several key components:
Cron Trigger (every 15 min) → Scheduler → Collection → Processing → Storage
↓ ↓ ↓ ↓
Clean expired → Fetch feeds → Score & filter → Store in KV
What happens during each scheduled run:
- Cleanup Phase: Remove expired threat data to maintain data quality
- Collection Phase: Fetch fresh threat intelligence from multiple sources
- Processing Phase: Score, validate, and apply whitelist filtering
- Storage Phase: Save processed data to KV storage for API access
Step 1: Define Threat Sources
We create a centralized threat source configuration because threat intelligence feeds come from many different providers, each with their own data formats, update frequencies, and reliability levels. By defining a standard structure for accessing these feeds, we can easily add new sources, assign appropriate weights based on their trustworthiness, and handle different data formats consistently. This approach allows our system to aggregate intelligence from multiple sources while properly attributing which feeds flagged each IP address, giving us better confidence in our threat assessments.
First, we configure the threat intelligence feeds our system will monitor. This defines where we get our threat data from.
import type { ThreatSource } from '../types';
// Configuration for threat intelligence feeds
// Each source defines how to fetch and score threat data
export const THREAT_SOURCES: ThreatSource[] = [
{
name: 'IPsum',
url: 'https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt',
weight: 1, // Lower weight = basic threat indicator
format: 'plain', // Simple text format, one IP per line
timeout: 10000, // 10 second fetch timeout
user_agent: 'Cloudflare-Intelligence-Collector/1.0'
},
{
name: 'EmergingThreats',
url: 'https://rules.emergingthreats.net/blockrules/compromised-ips.txt',
weight: 2, // Higher weight = more trusted source
format: 'plain', // Plain text format with comments
timeout: 15000, // 15 second timeout for reliability
user_agent: 'Cloudflare-Intelligence-Collector/1.0'
}
];
Key Configuration Points:
- Weight: Higher weights mean more trusted sources (EmergingThreats=2 vs IPsum=1)
- Format: Currently supports 'plain' text feeds (CSV/JSON support planned)
- Timeout: Different timeouts based on source reliability expectations
- User-Agent: Identifies our collector to the threat feed providers
Step 2: Implement Threat Collection
We implement automated threat collection because manually downloading and parsing threat feeds is time-consuming and unreliable. By fetching from multiple sources simultaneously using parallel processing, we significantly reduce collection time and ensure our threat intelligence stays current. The error isolation approach means that if one threat feed is temporarily unavailable, our system continues working with the other sources. This automated approach ensures we always have fresh threat intelligence without requiring manual intervention, and the deduplication logic prevents the same IP from being counted multiple times when it appears in different feeds.
This module handles fetching and parsing threat intelligence from multiple sources simultaneously.
import type { ThreatSource, ThreatIP, CollectionResult } from '../types';
import { THREAT_SOURCES } from './sources';
import { isValidIP } from './ip';
// Main collection orchestrator - coordinates fetching from all threat sources
export async function collectThreatIntelligence(): Promise<CollectionResult> {
const startTime = Date.now();
const allThreats = new Map<string, ThreatIP>(); // Deduplicated threat storage
const stats = {
total_sources: THREAT_SOURCES.length,
successful_sources: [] as string[],
failed_sources: [] as string[],
total_raw_ips: 0,
unique_ips: 0,
processing_time_ms: 0
};
// Fetch from all sources in parallel for maximum efficiency
// Each source is fetched independently - if one fails, others continue
const fetchPromises = THREAT_SOURCES.map(source =>
fetchAndParseSource(source).catch(error => {
console.error(`Failed to fetch ${source.name}:`, error);
stats.failed_sources.push(source.name);
return []; // Return empty array on failure to maintain data flow
})
);
// Wait for all sources to complete (or fail)
const results = await Promise.all(fetchPromises);
// Process results from each source and merge into unified threat map
for (let i = 0; i < results.length; i++) {
const source = THREAT_SOURCES[i];
const ips = results[i];
if (ips.length > 0) {
stats.successful_sources.push(source.name);
stats.total_raw_ips += ips.length;
// Add each IP to our deduplicated threat map
// Multiple sources reporting same IP increases its threat score
for (const ip of ips) {
addThreatIP(allThreats, ip, source);
}
}
}
stats.unique_ips = allThreats.size;
stats.processing_time_ms = Date.now() - startTime;
console.log(`Collection complete: ${stats.successful_sources.length}/${stats.total_sources} sources successful, ${stats.unique_ips} unique threats`);
return { threats: allThreats, stats };
}
// Fetches and parses data from a single threat intelligence source
async function fetchAndParseSource(source: ThreatSource): Promise<string[]> {
console.log(`Fetching ${source.name} from ${source.url}`);
try {
// HTTP fetch with timeout protection and proper headers
const response = await fetch(source.url, {
headers: {
'User-Agent': source.user_agent, // Identify ourselves to the server
'Accept': 'text/plain', // We expect plain text format
},
signal: AbortSignal.timeout(source.timeout) // Prevent hanging requests
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const text = await response.text();
console.log(`${source.name}: Fetched ${text.length} bytes`);
// Parse the raw text data into clean IP addresses
const ips = parseSourceData(text, source.format);
console.log(`${source.name}: Extracted ${ips.length} IPs`);
return ips;
} catch (error) {
console.error(`${source.name} fetch failed:`, error);
throw error; // Will be caught by Promise.all error handling
}
}
// Parses different data formats to extract clean IP addresses
function parseSourceData(data: string, format: 'plain' | 'csv' | 'json'): string[] {
const ips: string[] = [];
switch (format) {
case 'plain':
// Parse plain text feeds (most common format)
const lines = data.split('\n');
for (const line of lines) {
const trimmed = line.trim();
// Skip empty lines and comments (lines starting with #)
if (trimmed === '' || trimmed.startsWith('#')) {
continue;
}
// Extract IP address (first part before any whitespace)
// Format: "192.168.1.1 # optional comment"
const parts = trimmed.split(/\s+/);
const ip = parts[0];
// Validate IP format before adding to collection
if (isValidIP(ip)) {
ips.push(ip);
}
}
break;
case 'csv':
throw new Error('CSV format not yet implemented');
case 'json':
throw new Error('JSON format not yet implemented');
default:
throw new Error(`Unsupported format: ${format}`);
}
return ips;
}
// Manages threat deduplication and scoring when multiple sources report the same IP
function addThreatIP(threats: Map<string, ThreatIP>, ip: string, source: ThreatSource): void {
const timestamp = new Date().toISOString();
if (threats.has(ip)) {
// IP already exists - update with additional source information
const existing = threats.get(ip)!;
// Only add source if not already recorded (prevents double-counting)
if (!existing.sources.includes(source.name)) {
existing.sources.push(source.name); // Track which sources flagged this IP
existing.score += source.weight; // Increase confidence score
existing.last_seen = timestamp; // Update last observation time
}
} else {
// New IP - create initial threat record
const threatIP: ThreatIP = {
ip,
score: source.weight, // Initial score from this source
sources: [source.name], // First source that reported this IP
first_seen: timestamp, // When we first observed this threat
last_seen: timestamp, // Most recent observation
is_whitelisted: false // Will be checked during processing
};
threats.set(ip, threatIP);
}
}
Collection Process Explained:
- Parallel Fetching: All sources are fetched simultaneously for speed
- Error Isolation: If one source fails, others continue working
- Deduplication: Same IP from multiple sources increases confidence score
- Source Attribution: Track which feeds flagged each IP for transparency
Step 3: Implement Intelligent Scoring
We build an intelligent scoring system because not all threats are equal - some IPs might appear in only one feed while others are flagged by multiple trusted sources. By assigning weights to different threat intelligence sources and calculating confidence levels, we can prioritize our response efforts. For example, an IP flagged by both IPsum and EmergingThreats gets a higher score than one flagged by only IPsum. This scoring approach helps security teams focus on the most credible threats first, and the automatic categorization helps them understand what type of threat they're dealing with (malware, spam, scanning, etc.).
The scoring system converts raw threat data into actionable intelligence with confidence levels and risk categories.
import { EnhancedThreatIP, ThreatScoringConfig } from "../types";
// Central configuration for threat intelligence scoring system
export const SCORING_CONFIG: ThreatScoringConfig = {
confidence_thresholds: {
low: 1, // Single source detection
medium: 2, // Multiple sources or trusted source
high: 4, // Multiple trusted sources
very_high: 6 // Highly trusted sources + multiple confirmations
},
max_age_hours: 24, // Threat data expires after 24 hours
source_weights: { // Different sources have different reliability scores
'IPsum': 1, // Basic community-maintained feed
'EmergingThreats': 2, // Commercial threat intelligence
'Spamhaus': 3, // Premium threat intelligence (not configured)
'Custom': 5 // Internal/manual threat additions
}
};
// Converts numeric score into human-readable confidence level
export function calculateConfidenceLevel(score: number): 'low' | 'medium' | 'high' | 'very_high' {
const thresholds = SCORING_CONFIG.confidence_thresholds;
// Higher scores = higher confidence in threat assessment
if (score >= thresholds.very_high) return 'very_high';
if (score >= thresholds.high) return 'high';
if (score >= thresholds.medium) return 'medium';
return 'low';
}
// Creates enhanced threat object with automatic scoring and categorization
export function createEnhancedThreatIP(ip: string, sources: string[], baseScore: number): EnhancedThreatIP {
const now = new Date();
const expiresAt = new Date(now.getTime() + SCORING_CONFIG.max_age_hours * 60 * 60 * 1000);
return {
ip,
score: baseScore,
sources,
first_seen: now.toISOString(),
last_seen: now.toISOString(),
is_whitelisted: false,
confidence_level: calculateConfidenceLevel(baseScore), // Auto-calculate confidence
risk_category: inferRiskCategory(sources), // Auto-categorize threat type
last_validation: now.toISOString(),
expires_at: expiresAt.toISOString() // Auto-expiry based on config
};
}
// Attempts to categorize threat type based on source names
export function inferRiskCategory(sources: string[]): 'spam' | 'malware' | 'botnet' | 'scanning' | 'unknown' {
// Simple heuristic based on source naming patterns
if (sources.some(s => s.toLowerCase().includes('spam'))) return 'spam';
if (sources.some(s => s.toLowerCase().includes('malware'))) return 'malware';
if (sources.some(s => s.toLowerCase().includes('botnet'))) return 'botnet';
if (sources.some(s => s.toLowerCase().includes('scan'))) return 'scanning';
return 'unknown'; // Default when we can't determine category
}
Step 4: Data Validation and Cleanup
We implement data validation and cleanup because threat intelligence has a limited lifespan - an IP that was malicious yesterday might be clean today, or the threat feed might have been updated with newer information. By automatically removing expired threat data and validating the freshness of our intelligence, we maintain high data quality and prevent our system from flagging IPs based on outdated information. This cleanup process also prevents our storage from growing indefinitely, which keeps our API responses fast and reduces storage costs.
This module ensures data quality by removing expired threats and validating current threat intelligence.
import type { Env, EnhancedThreatIP } from '../types';
import { SCORING_CONFIG } from './scoring';
export async function validateThreatData(threats: EnhancedThreatIP[]): Promise<{
valid: EnhancedThreatIP[];
expired: EnhancedThreatIP[];
validation_stats: any;
}> {
const now = new Date();
const valid: EnhancedThreatIP[] = [];
const expired: EnhancedThreatIP[] = [];
const stats = {
total_processed: threats.length,
valid_count: 0,
expired_count: 0,
stale_count: 0,
confidence_distribution: {
low: 0,
medium: 0,
high: 0,
very_high: 0
},
risk_distribution: {
spam: 0,
malware: 0,
botnet: 0,
scanning: 0,
unknown: 0
}
};
for (const threat of threats) {
const expiresAt = new Date(threat.expires_at);
const lastSeen = new Date(threat.last_seen);
const ageHours = (now.getTime() - lastSeen.getTime()) / (1000 * 60 * 60);
if (now > expiresAt || ageHours > SCORING_CONFIG.max_age_hours) {
expired.push(threat);
stats.expired_count++;
} else {
threat.last_validation = now.toISOString();
valid.push(threat);
stats.valid_count++;
stats.confidence_distribution[threat.confidence_level]++;
stats.risk_distribution[threat.risk_category]++;
}
if (ageHours > SCORING_CONFIG.max_age_hours - 2) {
stats.stale_count++;
}
}
console.log('Data validation complete:', {
valid: stats.valid_count,
expired: stats.expired_count,
stale: stats.stale_count
});
return { valid, expired, validation_stats: stats };
}
export async function cleanExpiredThreats(env: Env): Promise<number> {
try {
const storedThreats = await env.THREAT_INTEL.get('threats:processed', 'json') as EnhancedThreatIP[];
if (!storedThreats) {
return 0;
}
const validation = await validateThreatData(storedThreats);
await env.THREAT_INTEL.put('threats:processed', JSON.stringify(validation.valid));
if (validation.expired.length > 0) {
await env.THREAT_INTEL.put('threats:expired', JSON.stringify(validation.expired), {
expirationTtl: 7 * 24 * 60 * 60
});
}
console.log(`Cleaned ${validation.expired.length} expired threats`);
return validation.expired.length;
} catch (error) {
console.error('Failed to clean expired threats:', error);
return 0;
}
}
Validation Process Explained:
- Expiry Checking: Remove threats older than 24 hours
- Statistics Generation: Track data quality metrics
- Archive Management: Keep expired data for audit purposes
- Performance Optimization: Prevent KV storage bloat
Step 5: Whitelist Protection System
We build a whitelist protection system because threat intelligence feeds sometimes incorrectly flag legitimate infrastructure as malicious, which could cause serious service disruptions if we blindly blocked those IPs. For example, if a threat feed accidentally flags a Cloudflare IP address, blocking it could break our own service. The whitelist ensures that critical infrastructure like CDN providers, DNS servers, and our own business-critical IP ranges are always protected from being blocked, even if they appear in threat feeds. This "safety-first" approach prevents outages while still allowing us to block genuine threats.
The whitelist system implements safety-first security by protecting essential infrastructure from being blocked.
import type { WhitelistEntry, WhitelistConfig, Env } from '../types';
import { isIpInRanges, isIpInCidr, isValidIP, isValidCidr } from './ip';
// Cloudflare's official IP ranges - these must NEVER be blocked
// Updated regularly from: https://www.cloudflare.com/ips/
const CLOUDFLARE_IP_RANGES = [
'173.245.48.0/20', // US East infrastructure
'103.21.244.0/22', // Asia Pacific
'103.22.200.0/22', // Asia Pacific
'103.31.4.0/22', // Asia Pacific
'141.101.64.0/18', // US West infrastructure
'108.162.192.0/18', // US infrastructure
'190.93.240.0/20', // South America
'188.114.96.0/20', // Europe
'197.234.240.0/22', // Africa
'198.41.128.0/17', // US infrastructure
'162.158.0.0/15', // Global Anycast
'104.16.0.0/13', // CDN infrastructure
'104.24.0.0/14', // CDN infrastructure
'172.64.0.0/13', // CDN infrastructure
'131.0.72.0/22' // Additional infrastructure
];
// Creates the default whitelist configuration with Cloudflare protection
export function getDefaultWhitelist(): WhitelistConfig {
return {
cloudflare: CLOUDFLARE_IP_RANGES, // Protected Cloudflare infrastructure
custom: [], // User-defined protected IPs (empty by default)
last_updated: new Date().toISOString()
};
}
// Checks if an IP should be protected from threat blocking
export function checkWhitelistWithConfig(ip: string, whitelist: WhitelistConfig): WhitelistEntry | null {
try {
// First check Cloudflare infrastructure ranges
if (isIpInRanges(ip, whitelist.cloudflare)) {
return {
ip: ip,
reason: 'Cloudflare infrastructure IP',
added_by: 'system',
added_at: whitelist.last_updated,
type: 'cloudflare'
};
}
// Then check custom whitelist entries (exact IPs or CIDR ranges)
for (const entry of whitelist.custom) {
if (entry.ip === ip || (entry.ip.includes('/') && isIpInCidr(ip, entry.ip))) {
return entry; // Return the custom protection entry
}
}
return null; // IP is not whitelisted
} catch (error) {
// Fail-safe: protect the IP if whitelist check fails
console.error('Whitelist check failed:', error);
return {
ip: ip,
reason: 'Whitelist check error - fail safe',
added_by: 'system',
added_at: new Date().toISOString(),
type: 'custom'
};
}
}
// Retrieves whitelist configuration from storage with automatic fallback
export async function getWhitelistConfig(env: Env): Promise<WhitelistConfig> {
try {
// Try to load custom whitelist configuration from KV storage
const stored = await env.THREAT_INTEL.get('whitelist:config', 'json') as WhitelistConfig;
if (stored) {
return stored; // Use existing custom configuration
}
// No custom config found - initialize with defaults
const defaultConfig = getDefaultWhitelist();
await env.THREAT_INTEL.put('whitelist:config', JSON.stringify(defaultConfig));
return defaultConfig;
} catch (error) {
console.error('Failed to get whitelist config:', error);
return getDefaultWhitelist(); // Fail-safe: always protect Cloudflare IPs
}
}
Whitelist Protection Logic:
- Cloudflare Protection: Always protect Cloudflare infrastructure IPs
- Custom Protection: Support for business-critical IP ranges
- Fail-Safe Design: When in doubt, protect rather than block
- CIDR Support: Handle both individual IPs and network ranges
Step 6: Statistics Generation
We generate comprehensive statistics because security automation systems need visibility to be effectively managed and improved. Without detailed metrics, it's impossible to know if our threat feeds are working properly, whether our data quality is good, or how well our whitelist protection is performing. These statistics help operations teams monitor system health, identify when threat feeds go offline, understand data quality trends, and demonstrate the value of the security automation to management. The metrics also enable proactive maintenance by warning us about data staleness or performance issues before they impact operations.
This module generates comprehensive metrics for monitoring system health and threat intelligence quality.
import type { Env, EnhancedThreatIP } from '../types';
import { SCORING_CONFIG } from './scoring';
export async function validateThreatData(threats: EnhancedThreatIP[]): Promise<{
valid: EnhancedThreatIP[];
expired: EnhancedThreatIP[];
validation_stats: any;
}> {
const now = new Date();
const valid: EnhancedThreatIP[] = [];
const expired: EnhancedThreatIP[] = [];
const stats = {
total_processed: threats.length,
valid_count: 0,
expired_count: 0,
stale_count: 0,
confidence_distribution: {
low: 0,
medium: 0,
high: 0,
very_high: 0
},
risk_distribution: {
spam: 0,
malware: 0,
botnet: 0,
scanning: 0,
unknown: 0
}
};
for (const threat of threats) {
const expiresAt = new Date(threat.expires_at);
const lastSeen = new Date(threat.last_seen);
const ageHours = (now.getTime() - lastSeen.getTime()) / (1000 * 60 * 60);
if (now > expiresAt || ageHours > SCORING_CONFIG.max_age_hours) {
expired.push(threat);
stats.expired_count++;
} else {
threat.last_validation = now.toISOString();
valid.push(threat);
stats.valid_count++;
stats.confidence_distribution[threat.confidence_level]++;
stats.risk_distribution[threat.risk_category]++;
}
if (ageHours > SCORING_CONFIG.max_age_hours - 2) {
stats.stale_count++;
}
}
console.log('Data validation complete:', {
valid: stats.valid_count,
expired: stats.expired_count,
stale: stats.stale_count
});
return { valid, expired, validation_stats: stats };
}
export async function cleanExpiredThreats(env: Env): Promise<number> {
try {
const storedThreats = await env.THREAT_INTEL.get('threats:processed', 'json') as EnhancedThreatIP[];
if (!storedThreats) {
return 0;
}
const validation = await validateThreatData(storedThreats);
await env.THREAT_INTEL.put('threats:processed', JSON.stringify(validation.valid));
if (validation.expired.length > 0) {
await env.THREAT_INTEL.put('threats:expired', JSON.stringify(validation.expired), {
expirationTtl: 7 * 24 * 60 * 60
});
}
console.log(`Cleaned ${validation.expired.length} expired threats`);
return validation.expired.length;
} catch (error) {
console.error('Failed to clean expired threats:', error);
return 0;
}
}
import type { CollectionResult, EnhancedThreatIP, ThreatIntelligenceStats } from '../types';
import { SCORING_CONFIG } from './scoring';
// Generates comprehensive statistics about threat intelligence collection and processing
export async function generateThreatStats(
collectionResult: CollectionResult,
processedThreats: EnhancedThreatIP[],
whitelistStats: any,
validationStats: any
): Promise<ThreatIntelligenceStats> {
let activeThreats = 0;
let totalScore = 0;
let maxScore = 0;
// Calculate active threat metrics (excluding whitelisted IPs)
for (const threat of processedThreats) {
if (!threat.is_whitelisted) {
activeThreats++;
totalScore += threat.score;
if (threat.score > maxScore) {
maxScore = threat.score;
}
}
}
const averageScore = activeThreats > 0 ? totalScore / activeThreats : 0;
// Return comprehensive statistics object for monitoring and dashboards
return {
collection: {
last_run: new Date().toISOString(),
duration_ms: collectionResult.stats.processing_time_ms,
sources_attempted: collectionResult.stats.total_sources,
sources_successful: collectionResult.stats.successful_sources.length,
sources_failed: collectionResult.stats.failed_sources
},
processing: {
raw_ips_collected: collectionResult.stats.total_raw_ips, // Total IPs from all sources
unique_ips_processed: collectionResult.stats.unique_ips, // After deduplication
duplicates_removed: collectionResult.stats.total_raw_ips - collectionResult.stats.unique_ips,
validation_passed: validationStats.valid_count, // Fresh, valid threats
validation_failed: validationStats.expired_count // Expired/stale data
},
scoring: {
confidence_distribution: validationStats.confidence_distribution, // Breakdown by confidence level
risk_distribution: validationStats.risk_distribution, // Breakdown by risk category
average_score: averageScore, // Mean threat score
highest_score: maxScore // Peak threat score
},
whitelist: {
total_checked: whitelistStats.total_checked, // IPs checked against whitelist
cloudflare_protected: whitelistStats.whitelisted_cloudflare, // Cloudflare IPs protected
custom_protected: whitelistStats.whitelisted_custom, // Custom whitelist protections
active_threats: whitelistStats.threats_remaining // Actual threats (not protected)
},
data_quality: {
freshness_hours: SCORING_CONFIG.max_age_hours, // Data expiry threshold
expired_removed: validationStats.expired_count, // Cleanup activity
stale_warnings: validationStats.stale_count, // Near-expiry warnings
data_coverage_percentage: Math.round( // Overall data quality score
(validationStats.valid_count / validationStats.total_processed) * 100
)
}
};
}
Step 7: Threat Processing Pipeline
We create a unified processing pipeline because raw threat intelligence data isn't ready for production use - it needs scoring, validation, and safety checks applied in the correct sequence. The processor coordinates all these transformations to ensure that data flows through each step properly: first we enhance the raw data with scores and categories, then we apply whitelist protection to prevent false positives, then we validate data freshness. This orchestrated approach ensures that other systems consuming our threat intelligence receive high-quality, safe, and actionable data rather than raw feeds that could cause problems.
The processor orchestrates the complete threat intelligence pipeline from raw data to actionable intelligence.
import type { ThreatIP, Env, EnhancedThreatIP, ThreatIntelligenceStats } from '../types';
import { createEnhancedThreatIP } from './scoring';
import { generateThreatStats } from './stats';
import { checkWhitelistWithConfig, getWhitelistConfig } from './whitelist';
import { validateThreatData } from './validator';
import { THREAT_SOURCES } from './sources';
// Main processing pipeline that transforms raw threats into enhanced intelligence
export async function processThreats(
rawThreats: Map<string, ThreatIP>,
env: Env
): Promise<{
threats: EnhancedThreatIP[];
stats: ThreatIntelligenceStats;
}> {
const startTime = Date.now();
// Step 1: Convert to enhanced threat objects with intelligent scoring
const enhancedThreats: EnhancedThreatIP[] = [];
for (const [ip, threat] of rawThreats) {
// Transform basic threat data into enhanced format with confidence levels
const enhanced = createEnhancedThreatIP(ip, threat.sources, threat.score);
enhancedThreats.push(enhanced);
}
// Step 2: Apply whitelist filtering (safety-first approach)
const whitelistStats = { total_checked: 0, whitelisted_cloudflare: 0, whitelisted_custom: 0, threats_remaining: 0 };
// Fetch whitelist config once for the entire batch (efficiency optimization)
const whitelistConfig = await getWhitelistConfig(env);
for (const threat of enhancedThreats) {
whitelistStats.total_checked++;
const whitelistEntry = checkWhitelistWithConfig(threat.ip, whitelistConfig);
if (whitelistEntry) {
// Mark as whitelisted and track protection type
threat.is_whitelisted = true;
if (whitelistEntry.type === 'cloudflare') {
whitelistStats.whitelisted_cloudflare++;
} else {
whitelistStats.whitelisted_custom++;
}
} else {
whitelistStats.threats_remaining++; // Active threat (not protected)
}
}
// Step 3: Validate data freshness and quality
const validation = await validateThreatData(enhancedThreats);
// Step 4: Generate comprehensive statistics for monitoring
const collectionResult = {
threats: rawThreats,
stats: {
processing_time_ms: Date.now() - startTime,
total_sources: THREAT_SOURCES.length,
successful_sources: THREAT_SOURCES.map(s => s.name),
failed_sources: [],
total_raw_ips: rawThreats.size,
unique_ips: rawThreats.size
}
};
// Combine all processing statistics into unified metrics
const stats = await generateThreatStats(
collectionResult,
validation.valid,
whitelistStats,
validation.validation_stats
);
return {
threats: validation.valid, // Only return fresh, validated threats
stats // Complete processing statistics
};
}
Processing Pipeline Explained:
- Enhancement: Convert raw IPs to rich threat objects with scores and categories
- Whitelist Filtering: Apply safety protections to prevent false positives
- Data Validation: Remove expired/stale threat intelligence
- Statistics Generation: Create comprehensive metrics for monitoring
Step 8: Data Storage Management
We implement optimized data storage because threat intelligence datasets can become very large (tens of thousands of IPs), and our API needs to respond quickly to threat lookups. By separating active threats from whitelisted ones and storing only the top-priority threats in our main API cache, we can deliver sub-100ms response times even with large datasets. The chunking strategy handles cases where we exceed Cloudflare KV's 25MB value limits, and the automatic expiry ensures our storage doesn't grow indefinitely. This storage design prioritizes API performance while managing costs effectively.
The storage module efficiently persists processed threats in KV storage with performance optimizations.
import { EnhancedThreatIP, ThreatIntelligenceStats } from "../types";
import { SCORING_CONFIG } from "./scoring";
// Efficiently stores processed threat intelligence with performance optimizations
export async function storeResults(result: { threats: EnhancedThreatIP[]; stats: ThreatIntelligenceStats }, env: Env): Promise<void> {
const activeThreats: EnhancedThreatIP[] = [];
const whitelistedThreats: EnhancedThreatIP[] = [];
// Separate threats by status for optimized storage and retrieval
for (const threat of result.threats) {
if (threat.is_whitelisted) {
whitelistedThreats.push(threat);
} else {
activeThreats.push(threat); // These are the actionable threats
}
}
// Handle large datasets with chunking (KV has 25MB value limit)
const maxChunkSize = 10000; // Store max 10k threats per key to stay under KV limits
try {
// Store enhanced threats in chunks
if (result.threats.length > maxChunkSize) {
for (let i = 0; i < result.threats.length; i += maxChunkSize) {
const chunk = result.threats.slice(i, i + maxChunkSize);
const chunkIndex = Math.floor(i / maxChunkSize);
await env.THREAT_INTEL.put(`threats:enhanced:${chunkIndex}`, JSON.stringify(chunk), {
expirationTtl: SCORING_CONFIG.max_age_hours * 60 * 60
});
}
// Store metadata about chunks
await env.THREAT_INTEL.put('threats:enhanced:meta', JSON.stringify({
total_chunks: Math.ceil(result.threats.length / maxChunkSize),
total_threats: result.threats.length,
chunk_size: maxChunkSize
}));
} else {
await env.THREAT_INTEL.put('threats:enhanced', JSON.stringify(result.threats), {
expirationTtl: SCORING_CONFIG.max_age_hours * 60 * 60
});
}
// Store active threats (limit to top 50k by score for performance)
const topActiveThreats = activeThreats
.sort((a, b) => b.score - a.score)
.slice(0, 50000);
await env.THREAT_INTEL.put('threats:active', JSON.stringify(topActiveThreats), {
expirationTtl: SCORING_CONFIG.max_age_hours * 60 * 60
});
// Store whitelisted (usually much smaller)
await env.THREAT_INTEL.put('threats:whitelisted', JSON.stringify(whitelistedThreats));
// Store comprehensive stats
await env.THREAT_INTEL.put('stats:comprehensive', JSON.stringify(result.stats));
await env.THREAT_INTEL.put('last_update', result.stats.collection.last_run);
console.log(`Stored ${result.threats.length} total threats, ${topActiveThreats.length} active threats`);
} catch (error) {
console.error('Failed to store enhanced results:', error);
throw error;
}
}
Storage Optimization Features:
- Chunked Storage: Handle large datasets within KV 25MB limits
- Performance Separation: Active threats stored separately for fast API access
- Automatic Expiry: TTL ensures data freshness without manual cleanup
- Audit Trail: Whitelisted threats stored for compliance and debugging
Step 9: Main Scheduler Orchestration
We create a master scheduler because while each individual component (collection, processing, storage) works well on its own, we need orchestration to run them in the correct sequence and handle failures gracefully. The scheduler ensures that data cleanup happens before collection (to free up space), that processing happens after collection is complete, and that storage only occurs if processing succeeds. This coordination prevents data corruption and ensures that our automated system recovers gracefully from temporary failures, like network issues or overloaded threat feeds.
The scheduler coordinates the entire collection workflow and runs automatically every 15 minutes.
import { collectThreatIntelligence } from "./collectThreat";
import { processThreats } from "./processor";
import { storeResults } from "./store";
import { cleanExpiredThreats } from "./validator";
// Main scheduled function that orchestrates the complete threat intelligence workflow
export async function scheduledCollection(env: Env): Promise<void> {
try {
// Step 1: Clean expired data first (maintains performance and data quality)
await cleanExpiredThreats(env);
// Step 2: Collect fresh intelligence from all configured sources
const collectionResult = await collectThreatIntelligence();
// Step 3: Process with enhanced scoring, whitelist filtering, and validation
const processedResult = await processThreats(collectionResult.threats, env);
// Step 4: Store processed results for API consumption
await storeResults(processedResult, env);
console.log('Enhanced collection complete:', {
active_threats: processedResult.stats.whitelist.active_threats,
data_quality: processedResult.stats.data_quality.data_coverage_percentage + '%'
});
} catch (error) {
console.error('Enhanced collection failed:', error);
// Note: Errors are logged but don't crash the scheduler - next run will retry
}
}
Scheduler Workflow Explained:
- Data Cleanup: Remove stale threats to maintain system performance
- Parallel Collection: Fetch from multiple threat feeds simultaneously
- Intelligence Processing: Apply scoring, whitelist filtering, and validation
- Optimized Storage: Store results in multiple KV keys for different use cases
- Error Resilience: Continue operating even if individual steps fail
Step 10: Integrate with Worker Runtime
We integrate with Cloudflare Workers' cron triggers because we need our threat intelligence collection to run automatically every 15 minutes without maintaining any servers or infrastructure. Traditional approaches would require setting up cron jobs on servers, monitoring those servers, and handling server failures. Cloudflare Workers' scheduled events give us reliable, globally distributed automation that runs in 300+ locations worldwide with built-in redundancy. This serverless approach means zero maintenance overhead while ensuring our threat intelligence stays fresh and our system remains highly available.
Finally, we connect our scheduler to the Cloudflare Workers runtime for automatic execution.
import type { Env } from './types';
import { scheduledCollection } from './lib/scheduler';
export default {
//... other code (fetch handler for API endpoints)
// Cloudflare Workers scheduled event handler
// Triggered automatically by cron expression: "*/15 * * * *" (every 15 minutes)
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> {
console.log('Scheduled collection triggered');
// Execute the complete threat intelligence collection workflow
// This runs the entire pipeline: cleanup → collection → processing → storage
await scheduledCollection(env);
},
};
Complete Automated Workflow Summary
Your automated threat intelligence scheduler now performs these steps every 15 minutes:
Execution Flow
15-minute timer → scheduled() → scheduledCollection() → 4-step pipeline
The 4-Step Pipeline
- 🧹 Cleanup: Remove expired threats (maintains data quality)
- 📡 Collection: Fetch from IPsum + EmergingThreats (parallel processing)
- ⚡ Processing: Score threats, apply whitelist protection, validate data
- 💾 Storage: Save to KV storage in optimized format for API access
Key Benefits
- Fully Automated: No manual intervention required
- Error Resilient: Individual failures don't break the entire system
- Performance Optimized: Parallel processing and efficient storage
- Safety First: Whitelist protection prevents blocking critical infrastructure
- Production Ready: Comprehensive logging and statistics for monitoring
Monitoring Your Scheduler
Once deployed, monitor your scheduler through:
- Cloudflare Dashboard: View cron trigger executions and error rates
- Worker Logs:
wrangler tailshows real-time execution logs - API Stats:
/statsendpoint provides comprehensive system health metrics - KV Storage: Monitor data growth and processing efficiency
The automated scheduler ensures your threat intelligence stays fresh, accurate, and actionable without manual maintenance.