Skip to main content

Building the Intelligence Collector API

Learning Objectives

In this section, you'll build a production-ready API layer that:

  • Exposes threat intelligence data through clean RESTful endpoints
  • Provides real-time system statistics for monitoring and operations
  • Implements proper API best practices including CORS, error handling, and pagination
  • Ensures high performance with caching and optimized data access

Why Build an API Layer?

The API layer transforms our collected threat intelligence into actionable data that other systems can consume. This enables:

  1. Integration with Security Tools - SIEM systems, firewalls, and monitoring platforms
  2. Real-time Threat Lookups - Check IPs against our intelligence database
  3. Operational Visibility - Monitor collection health and data quality
  4. Automated Workflows - Enable other systems to consume our threat data

Architecture Overview

HTTP Requests → Router → Controllers → KV Storage → Formatted Response
↓ ↓ ↓ ↓ ↓
CORS Headers Path Match Business Cached Data JSON API
Rate Limits Validation Logic Fast Access Standard Format

The API follows clean separation of concerns:

  • Routing handles HTTP basics (CORS, method validation)
  • Controllers implement business logic and data transformation
  • Storage layer provides optimized data access
  • Response formatting ensures consistent API contracts

/ips endpoint - Threat Intelligence Listing

This endpoint provides paginated access to our threat intelligence database with advanced filtering capabilities.

Why This Endpoint Matters

The /ips endpoint is the primary data interface for accessing our threat intelligence. It enables:

  • Security Operations Teams to query threats by confidence level and category
  • SIEM Systems to periodically sync threat data via pagination
  • Automated Tools to filter threats based on specific criteria
  • Monitoring Dashboards to display current threat landscape

Implementation Strategy

This endpoint prioritizes performance over real-time accuracy by using pre-processed "active threats" rather than querying all data. This design choice means:

  • Faster Response Times - Sub-100ms responses for large datasets
  • Reduced Compute Costs - No complex filtering on every request
  • Better User Experience - Consistent performance under load
  • Scalable Architecture - Handles thousands of requests efficiently
src/controllers/ips.ts
import type { Env, EnhancedThreatIP } from '../types';
import { generateRequestId, createPaginatedResponse, parseQueryParams } from '../lib/utils';
import { ipToInt } from '../lib/ip';

// Main controller for threat intelligence IP listing
// Provides paginated access with filtering and sorting capabilities
export async function handleIPsEndpoint(request: Request, env: Env): Promise<Response> {
const startTime = Date.now(); // Track processing time for performance monitoring
const requestId = generateRequestId(); // Unique ID for request tracing and debugging
const url = new URL(request.url); // Parse URL for query parameters

try {
// Extract and validate query parameters (page, limit, filters, sorting)
const params = parseQueryParams(url);

// Use pre-processed 'active' threats for optimal performance
// This contains only non-whitelisted threats, sorted by score (top 50k)
const allThreats = await env.THREAT_INTEL.get('threats:active', 'json') as EnhancedThreatIP[];

// Handle empty dataset gracefully (system just started or no threats found)
if (!allThreats || allThreats.length === 0) {
// Return empty result set with proper pagination metadata
return new Response(JSON.stringify(createPaginatedResponse(
[],
params.page,
params.limit,
0,
startTime,
requestId
)), {
status: 200,
headers: { 'Content-Type': 'application/json' }
});
}

// Apply client-requested filters in memory (already optimized dataset)
let filteredThreats = allThreats;

// Filter out whitelisted IPs unless specifically requested
if (!params.include_whitelisted) {
filteredThreats = filteredThreats.filter(t => !t.is_whitelisted);
}

// Filter by confidence level (low, medium, high, very_high)
if (params.confidence) {
filteredThreats = filteredThreats.filter(t => t.confidence_level === params.confidence);
}

// Filter by risk category (spam, malware, botnet, scanning, unknown)
if (params.category) {
filteredThreats = filteredThreats.filter(t => t.risk_category === params.category);
}

// Apply sorting (score_desc, score_asc, recent_first, oldest_first, ip_asc)
filteredThreats = applySorting(filteredThreats, params.sort!);

// Calculate pagination boundaries
const total = filteredThreats.length;
const startIndex = (params.page - 1) * params.limit;
const endIndex = startIndex + params.limit;
const paginatedThreats = filteredThreats.slice(startIndex, endIndex);

// Transform internal data structure to clean public API format
// Remove internal fields and ensure consistent response shape
const responseData = paginatedThreats.map(threat => ({
ip: threat.ip,
score: threat.score,
confidence_level: threat.confidence_level,
risk_category: threat.risk_category,
sources: threat.sources, // Which feeds flagged this IP
first_seen: threat.first_seen, // When first observed
last_seen: threat.last_seen, // Most recent observation
is_whitelisted: threat.is_whitelisted, // Protection status
expires_at: threat.expires_at // Data freshness indicator
}));

// Create standardized paginated response with metadata
const response = createPaginatedResponse(
responseData,
params.page,
params.limit,
total, // Total count before pagination
startTime, // For processing time calculation
requestId // For request tracing
);

// Return formatted response with caching headers
return new Response(JSON.stringify(response, null, 2), {
headers: {
'Content-Type': 'application/json',
'Cache-Control': 'public, max-age=60' // Cache for 1 minute (data updates every 15min)
}
});

} catch (error) {
// Return standardized error response with debugging information
return new Response(JSON.stringify({
success: false,
error: {
code: 'INTERNAL_ERROR',
message: 'Failed to retrieve threat data',
details: error instanceof Error ? error.message : 'Unknown error'
},
metadata: {
timestamp: new Date().toISOString(),
request_id: requestId,
processing_time_ms: Date.now() - startTime,
version: '1.0.0'
}
}), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}

// Flexible sorting function supporting multiple threat intelligence use cases
export function applySorting(threats: EnhancedThreatIP[], sortParam: string): EnhancedThreatIP[] {
switch (sortParam) {
case 'score_desc':
// Default: Highest threat score first (most dangerous threats)
return threats.sort((a, b) => b.score - a.score);
case 'score_asc':
// Lowest threat score first (least dangerous threats)
return threats.sort((a, b) => a.score - b.score);
case 'recent_first':
// Most recently seen threats first (fresh intelligence)
return threats.sort((a, b) => new Date(b.last_seen).getTime() - new Date(a.last_seen).getTime());
case 'oldest_first':
// Oldest threats first (historical analysis)
return threats.sort((a, b) => new Date(a.last_seen).getTime() - new Date(b.last_seen).getTime());
case 'ip_asc':
// Sort by IP address numerically (network analysis)
return threats.sort((a, b) => ipToInt(a.ip) - ipToInt(b.ip));
default:
// Safe fallback: Always default to highest score first
return threats.sort((a, b) => b.score - a.score);
}
}

Key Features of the /ips Endpoint

  1. Performance-Optimized: Uses pre-processed data for sub-100ms responses
  2. Flexible Filtering: Supports confidence levels, risk categories, and whitelist status
  3. Smart Pagination: Efficient pagination with total count and navigation metadata
  4. Multiple Sorting: Score, recency, and IP-based sorting for different use cases
  5. Caching Support: 60-second cache headers reduce load on frequent queries

Query Parameters:

  • page - Page number (default: 1)
  • limit - Items per page (1-1000, default: 100)
  • confidence - Filter by confidence level (low, medium, high, very_high)
  • category - Filter by risk category (spam, malware, botnet, scanning, unknown)
  • sort - Sorting method (score_desc, recent_first, ip_asc, etc.)
  • include_whitelisted - Include protected IPs (default: false)

/stats endpoint - System Health and Metrics

Why System Statistics Matter

The /stats endpoint is critical for operational excellence. It enables:

  • Operations Teams to monitor collection health and identify feed failures
  • Security Analysts to understand threat landscape trends and data quality
  • Incident Response to assess system performance during security events
  • Capacity Planning to understand data growth and processing requirements
  • Dashboard Integration for real-time operational visibility

What Statistics We Track

The endpoint provides five key metric categories:

  1. Collection Metrics - Feed health, success/failure rates, processing time
  2. Processing Metrics - Data deduplication, validation, and quality scores
  3. Scoring Metrics - Confidence distribution, risk categorization
  4. Whitelist Metrics - Protection coverage and threat filtering
  5. Data Quality Metrics - Freshness, coverage, and expiry management
src/controllers/stats.ts
import type { Env, ThreatIntelligenceStats } from '../types';
import { generateRequestId, createAPIResponse } from '../lib/utils';

// Provides comprehensive system statistics and operational metrics
export async function handleStatsEndpoint(env: Env): Promise<Response> {
const startTime = Date.now();
const requestId = generateRequestId();

try {
// Fetch comprehensive statistics generated during last collection run
const stats = await env.THREAT_INTEL.get('stats:comprehensive', 'json') as ThreatIntelligenceStats;
const lastUpdate = await env.THREAT_INTEL.get('last_update');

// Handle case where system hasn't run collection yet (initial startup)
if (!stats) {
return new Response(JSON.stringify(createAPIResponse(
null,
{ code: 'NO_STATS', message: 'Statistics not yet available - system initializing' },
startTime,
requestId
)), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}

// Enhance stored statistics with real-time calculated values
const enhancedStats = {
...stats, // All the stored statistics from collection process
system: {
last_update: lastUpdate,
// Calculate when next collection will occur (every 15 minutes)
next_collection_in_minutes: 15 - (new Date().getMinutes() % 15),
// System reliability metric based on successful vs attempted sources
uptime_percentage: (stats.collection.sources_successful / stats.collection.sources_attempted) * 100,
// How old is our current data (hours since last collection)
data_freshness_hours: Math.round(
(Date.now() - new Date(stats.collection.last_run).getTime()) / (1000 * 60 * 60)
)
},
// Provide API documentation within the stats response (self-documenting API)
api_info: {
version: '1.0.0',
endpoints: [
'GET /ips - List threats with filtering and pagination',
'GET /check?ip=X.X.X.X - Check specific IP status',
'POST /whitelist - Add IP to whitelist',
'DELETE /whitelist/{ip} - Remove IP from whitelist',
'GET /stats - This endpoint'
],
rate_limits: {
general: '1000 requests per minute',
check: '100 requests per minute per IP'
}
}
};

// Return formatted statistics with caching for performance
return new Response(JSON.stringify(createAPIResponse(
enhancedStats,
null,
startTime,
requestId
), null, 2), {
headers: {
'Content-Type': 'application/json',
'Cache-Control': 'public, max-age=60' // Cache stats for 1 minute
}
});

} catch (error) {
// Return structured error response for debugging
return new Response(JSON.stringify(createAPIResponse(
null,
{
code: 'STATS_FAILED',
message: 'Failed to retrieve system statistics',
details: error instanceof Error ? error.message : 'Unknown error'
},
startTime,
requestId
)), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}

Key Features of the /stats Endpoint

  1. Real-time System Health: Current uptime, next collection time, data freshness
  2. Historical Metrics: Processing performance, success rates, error tracking
  3. Data Quality Indicators: Coverage percentage, expiry management, validation stats
  4. Self-Documenting: Includes API documentation and rate limit information
  5. Monitoring Integration: Structured data perfect for dashboards and alerts

Example Response Structure:

{
"success": true,
"data": {
"collection": { "last_run": "...", "sources_successful": 2, "duration_ms": 1250 },
"processing": { "unique_ips_processed": 15420, "validation_passed": 15200 },
"scoring": { "confidence_distribution": { "high": 3200, "medium": 8900 } },
"whitelist": { "active_threats": 14800, "cloudflare_protected": 15 },
"system": { "uptime_percentage": 99.2, "next_collection_in_minutes": 8 }
}
}

Main Router - Bringing It All Together

The main router handles HTTP routing, CORS, and error management for our API endpoints.

Why We Need a Router

The router serves as the entry point for all HTTP requests and provides:

  1. Path-based Routing - Direct requests to the correct controller
  2. CORS Support - Enable browser-based integrations
  3. Error Handling - Consistent error responses across all endpoints
  4. Method Validation - Handle OPTIONS requests for preflight checks
  5. Request Tracing - Add correlation IDs and performance metrics
src/index.ts
import type { Env } from './types';
import { handleIPsEndpoint } from './controllers/ips';
import { handleStatsEndpoint } from './controllers/stats';
import { scheduledCollection } from './lib/scheduler';

export default {
// HTTP request handler - processes all API requests
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const url = new URL(request.url);
const path = url.pathname;
const method = request.method;

// Configure CORS to allow browser-based API access
// This enables integration with dashboards, SPAs, and monitoring tools
const corsHeaders = {
'Access-Control-Allow-Origin': '*', // Allow all origins
'Access-Control-Allow-Methods': 'GET, POST, DELETE, OPTIONS', // Supported methods
'Access-Control-Allow-Headers': 'Content-Type, Authorization' // Allowed headers
};

// Handle preflight requests (browser sends OPTIONS before actual request)
if (method === 'OPTIONS') {
return new Response(null, { headers: corsHeaders });
}

try {
// Route to appropriate controller based on URL path
switch (true) {
case path === '/ips':
// Handle threat intelligence listing with pagination and filtering
const response = await handleIPsEndpoint(request, env);
Object.entries(corsHeaders).forEach(([key, value]) => {
response.headers.set(key, value);
});
return response;

case path === '/stats':
// Handle system statistics and health metrics
const statsResponse = await handleStatsEndpoint(env);
Object.entries(corsHeaders).forEach(([key, value]) => {
statsResponse.headers.set(key, value);
});
return statsResponse;

default:
// Return helpful 404 with available endpoints
return new Response(JSON.stringify({
success: false,
error: {
code: 'NOT_FOUND',
message: 'Endpoint not found'
},
data: {
available_endpoints: [
'GET /ips - List threats with filtering and pagination',
'GET /stats - System health and statistics'
]
}
}, null, 2), {
status: 404,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
});
}
} catch (error) {
// Global error handler for any unhandled exceptions
console.error('Request processing error:', error);
return new Response(JSON.stringify({
success: false,
error: {
code: 'INTERNAL_ERROR',
message: 'Internal server error',
// In production, consider removing detailed error info for security
details: error instanceof Error ? error.message : 'Unknown error'
}
}), {
status: 500,
headers: { ...corsHeaders, 'Content-Type': 'application/json' }
});
}
},

// Scheduled handler - runs the threat intelligence collection pipeline
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> {
console.log('Scheduled collection triggered at:', new Date().toISOString());
await scheduledCollection(env);
},
};

Complete API Architecture Summary

Your Intelligence Collector API now provides a production-ready interface with:

🎯 Key API Features

  1. High-Performance Data Access

    • Sub-100ms response times through optimized KV storage patterns
    • Smart caching headers to reduce redundant requests
    • Pre-processed data for immediate availability
  2. Flexible Query Interface

    • Pagination for handling large datasets
    • Multi-dimensional filtering (confidence, category, whitelist status)
    • Multiple sorting options for different use cases
    • Consistent JSON response format with metadata
  3. Operational Excellence

    • Comprehensive system statistics for monitoring
    • Real-time health metrics and data quality indicators
    • Self-documenting API responses
    • Structured error handling with request tracing
  4. Production-Ready Design

    • CORS support for browser-based integrations
    • Standardized API response formats
    • Proper HTTP status codes and error messages
    • Request correlation IDs for debugging

🔗 Integration Examples

Get high-confidence threats:

curl "https://your-worker.workers.dev/ips?confidence=high&limit=50"

Monitor system health:

curl "https://your-worker.workers.dev/stats"

Search for specific threat categories:

curl "https://your-worker.workers.dev/ips?category=malware&sort=recent_first"

📊 Response Format Standards

All API responses follow a consistent structure:

{
"success": boolean,
"data": { /* response data */ },
"pagination": { /* pagination info for list endpoints */ },
"metadata": {
"timestamp": "2025-01-XX...",
"request_id": "req_...",
"processing_time_ms": 45,
"version": "1.0.0"
}
}

🚀 Next Steps

Your API is now ready for:

  • SIEM Integration - Periodic threat data synchronization
  • Security Dashboard - Real-time threat landscape visualization
  • Automated Blocking - Feed threat IPs to firewalls and security tools
  • Alerting Systems - Monitor collection health and data quality
  • Compliance Reporting - Generate threat intelligence reports

The combination of automated collection (scheduler) and flexible API access creates a complete threat intelligence platform ready for production security operations.