Webhook Destination Setup
Send detection alerts to any HTTP endpoint via POST request with JSON payload.
Generic Webhook Destination Setup
Send detection alerts to any HTTP endpoint via POST request with JSON payload.
Overview
The generic webhook destination sends HTTP POST requests containing complete detection information to any URL you specify. This provides maximum flexibility for integrating with custom systems or platforms not directly supported.
Use Cases
- Custom internal security platforms
- Proprietary SIEM or SOAR systems
- Middleware for additional processing
- Custom notification systems
- Integration with unsupported third-party platforms
- Development and testing of alert handling
Setup Steps
1. Prepare Webhook Endpoint
Your webhook endpoint must:
Accept HTTP POST requests
- Method: POST
- Content-Type: application/json
Return Success Status
- Status codes: 200-299 indicate success
- Other status codes treated as failures
Respond Within Timeout
- Default timeout: 30 seconds
- Configure longer timeout if needed
Handle JSON Payload
- Parse JSON request body
- Extract detection fields
- Process alert appropriately
2. Test Endpoint
Test your endpoint accepts POST requests:
curl -X POST https://your-endpoint.example.com/webhook \
-H "Content-Type: application/json" \
-d '{
"test": "message",
"detection_name": "Test Detection"
}'Expected: Status code 200-299
3. Configure in Query.ai
Contact your Query.ai administrator to configure the webhook destination with:
Required Configuration:
- Webhook URL
Optional Configuration:
- Timeout in seconds (default: 30)
Payload Format
The webhook receives a JSON payload with all detection fields:
{
"detection_id": 123,
"detection_name": "Suspicious Login Attempts",
"description": "Multiple failed login attempts detected",
"severity": "HIGH",
"outcome": "MATCHED",
"match_count": 5,
"replay_link": "https://app.query.ai/replay/123",
"ran_at": "2025-01-15T10:00:00Z",
"range_start": "2025-01-15T09:00:00Z",
"range_end": "2025-01-15T10:00:00Z",
"run_id": "run-456",
"run_type": "SCHEDULED",
"errors": [],
"match_operator": "GREATER_THAN",
"match_threshold": 0,
"match_eagerness": "EXHAUSTIVE",
"match_exhaustiveness": "COMPLETED",
"search_id": "search-abc-123",
"trace_id": "1-abc-def"
}Payload Fields
Detection Metadata:
detection_id(integer) - Detection configuration IDdetection_name(string) - Detection namedescription(string) - Detection description (may be empty)severity(string) - CRITICAL, HIGH, MEDIUM, or LOW
Execution Details:
run_id(string) - Unique execution run IDrun_type(string) - SCHEDULED or MANUALoutcome(string) - MATCHED, NOT_MATCHED, or ERRORmatch_count(integer) - Number of matches found
Threshold Configuration:
match_operator(string) - Comparison operator (e.g., GREATER_THAN, EQUAL_TO)match_threshold(integer) - Threshold valuematch_eagerness(string) - EAGER or EXHAUSTIVE
Execution Metadata (optional, may be null):
match_exhaustiveness(string) - COMPLETED or STOPPED_EARLYsearch_id(string) - FSQL API search identifier for debuggingtrace_id(string) - AWS X-Ray trace identifier for debugging
Timestamps:
ran_at(string) - ISO 8601 timestamp when detection executedrange_start(string) - ISO 8601 timestamp of query time range startrange_end(string) - ISO 8601 timestamp of query time range end
Additional:
replay_link(string) - URL to replay query in Query.ai UIerrors(array) - Array of error objects (empty if no errors)
Error Object Format
When outcome is ERROR, the errors array contains error objects:
{
"errors": [
{
"message": "Query timeout exceeded",
"code": "TIMEOUT",
"source": "FSQL API"
}
]
}Example Implementations
Python Flask
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
@app.route('/webhook', methods=['POST'])
def handle_detection():
"""Handle detection alert webhook."""
# Parse JSON payload
data = request.get_json()
# Extract key fields
detection_name = data.get('detection_name')
severity = data.get('severity')
outcome = data.get('outcome')
match_count = data.get('match_count')
replay_link = data.get('replay_link')
# Log alert
logging.info(f"Detection Alert: {detection_name}")
logging.info(f"Severity: {severity}, Outcome: {outcome}")
logging.info(f"Matches: {match_count}")
logging.info(f"Replay: {replay_link}")
# Process based on severity
if severity in ['CRITICAL', 'HIGH']:
# Send to incident response system
create_incident(data)
# Forward to SIEM
send_to_siem(data)
# Return success
return jsonify({'status': 'success'}), 200
def create_incident(detection):
"""Create incident in ticketing system."""
# Implementation here
pass
def send_to_siem(detection):
"""Forward to SIEM."""
# Implementation here
pass
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)Node.js Express
const express = require('express');
const app = express();
app.use(express.json());
app.post('/webhook', (req, res) => {
// Parse payload
const detection = req.body;
// Extract key fields
const {
detection_name,
severity,
outcome,
match_count,
replay_link
} = detection;
// Log alert
console.log(`Detection Alert: ${detection_name}`);
console.log(`Severity: ${severity}, Outcome: ${outcome}`);
console.log(`Matches: ${match_count}`);
console.log(`Replay: ${replay_link}`);
// Process based on severity
if (severity === 'CRITICAL' || severity === 'HIGH') {
createIncident(detection);
}
// Forward to SIEM
sendToSIEM(detection);
// Return success
res.status(200).json({ status: 'success' });
});
function createIncident(detection) {
// Implementation here
}
function sendToSIEM(detection) {
// Implementation here
}
app.listen(8080, () => {
console.log('Webhook server listening on port 8080');
});AWS Lambda
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Process detection alert webhook."""
# Parse payload from API Gateway
body = json.loads(event['body'])
# Extract key fields
detection_name = body.get('detection_name')
severity = body.get('severity')
outcome = body.get('outcome')
match_count = body.get('match_count')
replay_link = body.get('replay_link')
# Log alert
logger.info(f"Detection Alert: {detection_name}")
logger.info(f"Severity: {severity}, Outcome: {outcome}")
logger.info(f"Matches: {match_count}")
# Process based on severity
if severity in ['CRITICAL', 'HIGH']:
# Create incident
create_incident(body)
# Forward to SIEM
send_to_siem(body)
# Return success
return {
'statusCode': 200,
'body': json.dumps({'status': 'success'})
}
def create_incident(detection):
"""Create incident in ticketing system."""
# Implementation here
pass
def send_to_siem(detection):
"""Forward to SIEM."""
# Implementation here
passSecurity Considerations
Authentication
The basic webhook destination does not include authentication. To secure your endpoint:
Option 1: API Key in URL
https://your-endpoint.example.com/webhook?api_key=your-secret-key
Option 2: IP Allowlist
- Configure firewall to only accept requests from Query.ai IP addresses
- Contact Query.ai support for IP address list
Option 3: Reverse Proxy with Authentication
- Place endpoint behind reverse proxy (e.g., nginx, API Gateway)
- Configure authentication at proxy level
Option 4: VPN or Private Network
- Deploy endpoint in private network
- Use VPN or AWS PrivateLink for connectivity
HTTPS
Always use HTTPS endpoints:
- Encrypts data in transit
- Prevents man-in-the-middle attacks
- Validates server identity
Example: https://your-endpoint.example.com/webhook (not http://)
Input Validation
Validate webhook payload:
def validate_payload(data):
"""Validate webhook payload structure."""
required_fields = [
'detection_id',
'detection_name',
'severity',
'outcome',
'match_count',
'replay_link'
]
for field in required_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")
# Validate severity
valid_severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
if data['severity'] not in valid_severities:
raise ValueError(f"Invalid severity: {data['severity']}")
# Validate outcome
valid_outcomes = ['MATCHED', 'NOT_MATCHED', 'ERROR']
if data['outcome'] not in valid_outcomes:
raise ValueError(f"Invalid outcome: {data['outcome']}")
return TrueRate Limiting
Implement rate limiting to prevent abuse:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["100 per hour"]
)
@app.route('/webhook', methods=['POST'])
@limiter.limit("10 per minute")
def handle_detection():
# Handle webhook
passTesting
Test with curl
curl -X POST https://your-endpoint.example.com/webhook \
-H "Content-Type: application/json" \
-d '{
"detection_id": 123,
"detection_name": "Test Detection",
"severity": "HIGH",
"outcome": "MATCHED",
"match_count": 5,
"replay_link": "https://app.query.ai/replay/123",
"ran_at": "2025-01-15T10:00:00Z",
"range_start": "2025-01-15T09:00:00Z",
"range_end": "2025-01-15T10:00:00Z",
"run_id": "run-456",
"run_type": "MANUAL",
"errors": [],
"match_operator": "GREATER_THAN",
"match_threshold": 0,
"match_eagerness": "EXHAUSTIVE"
}'Test with Detection
- Create test detection with low threshold
- Add webhook destination
- Click Run Now
- Verify webhook receives request
- Check endpoint logs for payload
Troubleshooting
Common Issues
| Error | Cause | Solution |
|---|---|---|
| Connection refused | Endpoint not accessible | Verify URL, check firewall rules |
| Timeout | Endpoint slow to respond | Increase timeout, optimize endpoint |
| 404 Not Found | Incorrect URL path | Verify webhook URL is correct |
| 401/403 Unauthorized | Authentication failure | Check API keys, verify credentials |
| 500 Internal Server Error | Endpoint error | Check endpoint logs, fix bugs |
| SSL/TLS errors | Certificate issues | Verify SSL certificate is valid |
Debugging
Enable Logging:
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)Log Incoming Requests:
@app.route('/webhook', methods=['POST'])
def handle_detection():
# Log full request
logging.debug(f"Headers: {request.headers}")
logging.debug(f"Body: {request.get_data()}")
# Process request
data = request.get_json()
logging.info(f"Received detection: {data.get('detection_name')}")
return jsonify({'status': 'success'}), 200Test Locally:
# Run endpoint locally
python webhook_server.py
# Test with curl
curl -X POST http://localhost:8080/webhook \
-H "Content-Type: application/json" \
-d '{"detection_name": "Test"}'View Query.ai Logs
Contact your Query.ai administrator to review CloudWatch logs:
aws logs tail /aws/lambda/detection-outcome-handler --followLook for webhook-related errors in the logs.
Configuration Options
Required
url
- Webhook endpoint URL
- Must be HTTPS (recommended)
- Must accept POST requests
- Must return 200-299 status code
Optional
timeout
- Request timeout in seconds
- Default: 30
- Increase for slow endpoints
- Maximum: 300 (5 minutes)
Best Practices
- Use HTTPS: Always use HTTPS endpoints for security
- Implement Authentication: Secure endpoint with API keys or IP allowlist
- Validate Input: Validate payload structure and values
- Handle Errors Gracefully: Return appropriate status codes
- Log Requests: Log all incoming requests for debugging
- Monitor Performance: Track response times and error rates
- Implement Retries: Handle transient failures in your endpoint
- Rate Limit: Protect endpoint from abuse
- Test Thoroughly: Test with various payload scenarios
- Document Endpoint: Document expected payload and behavior
Advanced Patterns
Routing by Severity
@app.route('/webhook', methods=['POST'])
def handle_detection():
data = request.get_json()
severity = data.get('severity')
if severity == 'CRITICAL':
# Page on-call engineer
page_oncall(data)
elif severity == 'HIGH':
# Create incident
create_incident(data)
elif severity == 'MEDIUM':
# Create ticket
create_ticket(data)
else:
# Log only
log_detection(data)
return jsonify({'status': 'success'}), 200Enrichment
@app.route('/webhook', methods=['POST'])
def handle_detection():
data = request.get_json()
# Enrich with additional context
enriched = enrich_detection(data)
# Forward to SIEM with enrichment
send_to_siem(enriched)
return jsonify({'status': 'success'}), 200
def enrich_detection(detection):
"""Add additional context to detection."""
# Add threat intelligence
detection['threat_intel'] = lookup_threat_intel(detection)
# Add asset information
detection['asset_info'] = lookup_asset_info(detection)
# Add user context
detection['user_context'] = lookup_user_context(detection)
return detectionFan-Out
@app.route('/webhook', methods=['POST'])
def handle_detection():
data = request.get_json()
# Send to multiple destinations
destinations = [
send_to_siem,
send_to_ticketing,
send_to_slack,
send_to_metrics
]
for destination in destinations:
try:
destination(data)
except Exception as e:
logging.error(f"Failed to send to {destination.__name__}: {e}")
return jsonify({'status': 'success'}), 200Resources
- HTTP Status Codes - Understanding status codes
- JSON Schema - Validating JSON payloads
- Flask Documentation - Python web framework
- Express Documentation - Node.js web framework
Updated 2 days ago