Examples¶
Complete code examples and common patterns for using the Python SDK.
GitHub examples¶
The SDK repository includes a comprehensive examples directory with working code:
Available examples¶
- basic_usage.py - Simple API calls and response handling
- iris_detect_workflow.py - Complete Iris Detect workflow
- iris_investigate_pivot_ns_host.py - Pivoting on infrastructure
- iris_investigate_filter_output.py - Filtering results
- relative_time_inputs.py - Working with time-based queries
- retrieving_all_results_in_paginated_return.py - Pagination handling
Common patterns¶
Basic API call¶
from domaintools import API
# Initialize API
api = API('your_username', 'your_api_key')
# Make a simple call
result = api.domain_profile('domaintools.com')
# Access response
print(result.response()['registrant']['name'])
Error handling¶
from domaintools import API
from domaintools.exceptions import (
BadRequestException,
NotAuthorizedException,
ServiceUnavailableException
)
api = API('your_username', 'your_api_key')
try:
result = api.iris_enrich('example.com')
for domain in result:
print(f"{domain['domain']}: {domain['domain_risk']['risk_score']}")
except BadRequestException as e:
print(f"Bad request: {e.reason['error']['message']}")
except NotAuthorizedException:
print("Authentication failed - check credentials")
except ServiceUnavailableException:
print("Rate limit exceeded or service unavailable")
Batch processing¶
from domaintools import API
api = API('your_username', 'your_api_key')
# Process multiple domains
domains = ['example1.com', 'example2.com', 'example3.com']
# Enrich in batches of 100
batch_size = 100
for i in range(0, len(domains), batch_size):
batch = domains[i:i + batch_size]
results = api.iris_enrich(*batch)
for domain in results:
risk = domain['domain_risk']['risk_score']
print(f"{domain['domain']}: Risk {risk}")
Feed processing¶
from domaintools import API
import time
api = API('your_username', 'your_api_key')
# Initialize session
session_id = 'my-app-nod-feed'
# Poll continuously
while True:
try:
results = api.nod(sessionID=session_id)
for record in results.response():
# Process each domain
print(f"New domain: {record['domain']}")
# Wait before next poll
time.sleep(300) # 5 minutes
except KeyboardInterrupt:
print("Stopping feed processing")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(60)
Pagination¶
from domaintools import API
api = API('your_username', 'your_api_key')
# Initial query with restrictive filters to avoid 10K limit
from datetime import datetime, timedelta
response = api.iris_investigate(
nameserver_host='ns1.google.com',
risk_score_ranges=['90-100'],
create_date_after=(datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d'),
limit=50
)
# Collect all results
all_results = response['results']
search_hash = response.get('search_hash')
# Paginate through remaining results
while response.get('has_more_results') and search_hash:
response = api.iris_investigate(
search_hash=search_hash,
position=response['position']
)
all_results.extend(response['results'])
print(f"Total results: {len(all_results)}")
Multi-product workflow¶
from domaintools import API
api = API('your_username', 'your_api_key')
# Step 1: Get feed data
feed = api.nod(after=-3600)
domains = [r['domain'] for r in list(feed.response())[:100]]
# Step 2: Enrich domains
if domains:
enriched = api.iris_enrich(*domains)
# Step 3: Filter high-risk
high_risk = [
d for d in enriched
if d['domain_risk']['risk_score'] >= 80
]
# Step 4: Investigate infrastructure
for domain_data in high_risk:
domain = domain_data['domain']
infrastructure = api.iris_investigate(domain)
if infrastructure['results']:
ns = infrastructure['results'][0]['name_server']
print(f"{domain}: {ns}")
Async processing¶
import asyncio
from domaintools import API
async def process_domains(domains):
api = API('your_username', 'your_api_key')
# Create async tasks for each domain
tasks = [api.domain_profile(domain) for domain in domains]
# Wait for all to complete
results = await asyncio.gather(*tasks)
# Access results
for result in results:
print(result['domain'])
# Run async function
domains = ['example1.com', 'example2.com', 'example3.com']
asyncio.run(process_domains(domains))
CLI scripting¶
#!/bin/bash
# Process a list of domains
while IFS= read -r domain; do
echo "Processing: $domain"
domaintools iris_enrich "$domain" -f json > "results/${domain}.json"
sleep 1
done < domains.txt
Environment-based configuration¶
import os
from domaintools import API
# Load configuration from environment
config = {
'username': os.environ.get('DOMAINTOOLS_USERNAME'),
'api_key': os.environ.get('DOMAINTOOLS_API_KEY'),
'proxy_url': os.environ.get('DOMAINTOOLS_PROXY'),
'rate_limit': os.environ.get('DOMAINTOOLS_RATE_LIMIT', 'true').lower() == 'true'
}
# Create API instance
api = API(
config['username'],
config['api_key'],
proxy_url=config.get('proxy_url'),
rate_limit=config['rate_limit']
)
Integration examples¶
SIEM integration¶
from domaintools import API
import json
api = API('your_username', 'your_api_key')
def enrich_for_siem(domains):
"""Enrich domains for SIEM ingestion."""
results = api.iris_enrich(*domains)
siem_events = []
for domain in results:
event = {
'domain': domain['domain'],
'risk_score': domain['domain_risk']['risk_score'],
'registrant': domain.get('registrant_contact', {}).get('name', {}).get('value'),
'create_date': domain.get('create_date', {}).get('value'),
'ip_addresses': [ip['address']['value'] for ip in domain.get('ip', [])]
}
siem_events.append(event)
return siem_events
# Use in SIEM pipeline
domains = ['example1.com', 'example2.com']
events = enrich_for_siem(domains)
print(json.dumps(events, indent=2))
Threat hunting workflow¶
from domaintools import API
api = API('your_username', 'your_api_key')
def hunt_infrastructure(nameserver):
"""Hunt for suspicious domains on infrastructure."""
# Find all domains on nameserver with restrictive filters
results = api.iris_investigate(
nameserver_host=nameserver,
risk_score_ranges=['90-100']
)
suspicious_domains = []
for domain in results['results']:
# Get detailed profile
profile = api.domain_profile(domain['domain'])
# Check for indicators
if is_suspicious(profile):
suspicious_domains.append(domain['domain'])
return suspicious_domains
def is_suspicious(profile):
"""Check for suspicious indicators."""
response = profile.response()
# Check domain age
if response.get('create_date'):
# Add age check logic
pass
# Check registrant
if not response.get('registrant', {}).get('name'):
return True
return False
Next steps¶
- Iris Platform - Deep domain analysis
- Threat Feeds - Real-time intelligence
- Lookups and Monitors - Historical data and alerts
- Advanced Features - Workflows and pagination
- Configuration - Rate limits and settings