Skip to content

Examples

Complete code examples and common patterns for using the Python SDK.

GitHub examples

The SDK repository includes a comprehensive examples directory with working code:

View examples on GitHub

Available examples

Common patterns

Basic API call

from domaintools import API

# Initialize API
api = API('your_username', 'your_api_key')

# Make a simple call
result = api.domain_profile('domaintools.com')

# Access response
print(result.response()['registrant']['name'])

Error handling

from domaintools import API
from domaintools.exceptions import (
    BadRequestException,
    NotAuthorizedException,
    ServiceUnavailableException
)

api = API('your_username', 'your_api_key')

try:
    result = api.iris_enrich('example.com')
    for domain in result:
        print(f"{domain['domain']}: {domain['domain_risk']['risk_score']}")
except BadRequestException as e:
    print(f"Bad request: {e.reason['error']['message']}")
except NotAuthorizedException:
    print("Authentication failed - check credentials")
except ServiceUnavailableException:
    print("Rate limit exceeded or service unavailable")

Batch processing

from domaintools import API

api = API('your_username', 'your_api_key')

# Process multiple domains
domains = ['example1.com', 'example2.com', 'example3.com']

# Enrich in batches of 100
batch_size = 100
for i in range(0, len(domains), batch_size):
    batch = domains[i:i + batch_size]
    results = api.iris_enrich(*batch)

    for domain in results:
        risk = domain['domain_risk']['risk_score']
        print(f"{domain['domain']}: Risk {risk}")

Feed processing

from domaintools import API
import time

api = API('your_username', 'your_api_key')

# Initialize session
session_id = 'my-app-nod-feed'

# Poll continuously
while True:
    try:
        results = api.nod(sessionID=session_id)

        for record in results.response():
            # Process each domain
            print(f"New domain: {record['domain']}")

        # Wait before next poll
        time.sleep(300)  # 5 minutes

    except KeyboardInterrupt:
        print("Stopping feed processing")
        break
    except Exception as e:
        print(f"Error: {e}")
        time.sleep(60)

Pagination

from domaintools import API

api = API('your_username', 'your_api_key')

# Initial query with restrictive filters to avoid 10K limit
from datetime import datetime, timedelta

response = api.iris_investigate(
    nameserver_host='ns1.google.com',
    risk_score_ranges=['90-100'],
    create_date_after=(datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d'),
    limit=50
)

# Collect all results
all_results = response['results']
search_hash = response.get('search_hash')

# Paginate through remaining results
while response.get('has_more_results') and search_hash:
    response = api.iris_investigate(
        search_hash=search_hash,
        position=response['position']
    )
    all_results.extend(response['results'])

print(f"Total results: {len(all_results)}")

Multi-product workflow

from domaintools import API

api = API('your_username', 'your_api_key')

# Step 1: Get feed data
feed = api.nod(after=-3600)
domains = [r['domain'] for r in list(feed.response())[:100]]

# Step 2: Enrich domains
if domains:
    enriched = api.iris_enrich(*domains)

    # Step 3: Filter high-risk
    high_risk = [
        d for d in enriched
        if d['domain_risk']['risk_score'] >= 80
    ]

    # Step 4: Investigate infrastructure
    for domain_data in high_risk:
        domain = domain_data['domain']
        infrastructure = api.iris_investigate(domain)

        if infrastructure['results']:
            ns = infrastructure['results'][0]['name_server']
            print(f"{domain}: {ns}")

Async processing

import asyncio
from domaintools import API

async def process_domains(domains):
    api = API('your_username', 'your_api_key')

    # Create async tasks for each domain
    tasks = [api.domain_profile(domain) for domain in domains]

    # Wait for all to complete
    results = await asyncio.gather(*tasks)

    # Access results
    for result in results:
        print(result['domain'])

# Run async function
domains = ['example1.com', 'example2.com', 'example3.com']
asyncio.run(process_domains(domains))

CLI scripting

#!/bin/bash

# Process a list of domains
while IFS= read -r domain; do
    echo "Processing: $domain"
    domaintools iris_enrich "$domain" -f json > "results/${domain}.json"
    sleep 1
done < domains.txt

Environment-based configuration

import os
from domaintools import API

# Load configuration from environment
config = {
    'username': os.environ.get('DOMAINTOOLS_USERNAME'),
    'api_key': os.environ.get('DOMAINTOOLS_API_KEY'),
    'proxy_url': os.environ.get('DOMAINTOOLS_PROXY'),
    'rate_limit': os.environ.get('DOMAINTOOLS_RATE_LIMIT', 'true').lower() == 'true'
}

# Create API instance
api = API(
    config['username'],
    config['api_key'],
    proxy_url=config.get('proxy_url'),
    rate_limit=config['rate_limit']
)

Integration examples

SIEM integration

from domaintools import API
import json

api = API('your_username', 'your_api_key')

def enrich_for_siem(domains):
    """Enrich domains for SIEM ingestion."""
    results = api.iris_enrich(*domains)

    siem_events = []
    for domain in results:
        event = {
            'domain': domain['domain'],
            'risk_score': domain['domain_risk']['risk_score'],
            'registrant': domain.get('registrant_contact', {}).get('name', {}).get('value'),
            'create_date': domain.get('create_date', {}).get('value'),
            'ip_addresses': [ip['address']['value'] for ip in domain.get('ip', [])]
        }
        siem_events.append(event)

    return siem_events

# Use in SIEM pipeline
domains = ['example1.com', 'example2.com']
events = enrich_for_siem(domains)
print(json.dumps(events, indent=2))

Threat hunting workflow

from domaintools import API

api = API('your_username', 'your_api_key')

def hunt_infrastructure(nameserver):
    """Hunt for suspicious domains on infrastructure."""
    # Find all domains on nameserver with restrictive filters
    results = api.iris_investigate(
        nameserver_host=nameserver,
        risk_score_ranges=['90-100']
    )

    suspicious_domains = []
    for domain in results['results']:
        # Get detailed profile
        profile = api.domain_profile(domain['domain'])

        # Check for indicators
        if is_suspicious(profile):
            suspicious_domains.append(domain['domain'])

    return suspicious_domains

def is_suspicious(profile):
    """Check for suspicious indicators."""
    response = profile.response()

    # Check domain age
    if response.get('create_date'):
        # Add age check logic
        pass

    # Check registrant
    if not response.get('registrant', {}).get('name'):
        return True

    return False

Next steps

Additional resources