The python source code must define the Extension API method called request which should take two arguments, a request dictionary and an authentication dictionary. It shall return a response in json format where all keys shall be defined in the extension metadata file.
The python source code must run in python version 2.7.
The request dictionary always has the following format:
{
"entity":
{
"name": "some entity name, ex recordedfuture.com",
"type": "some entity type, ex InternetDomainName"
}
}
An extension will only get requests for entity types that it says it supports (read from supported_types field).
The extension source code is responsible for authenticating against the external service using the information provided in the authentication dictionary. If the authentication_fields in extension.json was specified with the keys username and password like in the example above, the authentication dictionary will contain exactly these keys.
{
"username": "user_name_value",
"password": "password_value"
}
Where user_name_value and password_value will be populated from the data the user has entered in the edit credentials page.
Source code - Example
The following example uses urllib2 but other call methods like Python requests are also supported
import hashlib
import hmac
import json
import urllib2
from datetime import datetime
from recordedfuture_extension_util.extension_util import *
class DTSigner(object):
def __init__(self, api_username, api_key):
self.api_username = api_username.encode("UTF-8")
self.api_key = api_key.encode("UTF-8")
@staticmethod
def timestamp():
return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
def sign(self, timestamp, uri):
params = ''.join([self.api_username, timestamp, uri])
return hmac.new(self.api_key, params, digestmod=hashlib.sha1).hexdigest()
def request(request_dict, auth_info):
term = request_dict["entity"]["name"]
type = request_dict["entity"]["type"]
if auth_info != None and "username" in auth_info and "password" in auth_info:
username = auth_info["username"]
password = auth_info["password"]
if type == "InternetDomainName":
profile_uri = '/v1/{0}'.format(term)
# No try block on this call. If no profile, return NoResultException.
profile_response = request_domaintools(profile_uri, username, password, None)
parsed_whois_uri = '/v1/{0}/whois/parsed'.format(term)
whois_response = None
try:
whois_response = request_domaintools(parsed_whois_uri, username, password, None)
except NoResultException:
## Optional, proceed
pass
hosting_history_uri = '/v1/{0}/hosting-history'.format(term)
hosting_history_response = None
try:
hosting_history_response = request_domaintools(hosting_history_uri, username, password, None)
except NoResultException:
## Optional, proceed
pass
## Uncomment these lines for local debugging
##log("domain-tools-idn", "domain tools profile response: {0}".format(profile_response), "debug")
##log("domain-tools-idn", "domain tools parsed whois response: {0}".format(whois_response), "debug")
##log("domain-tools-idn", "domain tools hosting history response: {0}".format(hosting_history_response), "debug")
return transform_idn_response(profile_response, whois_response, hosting_history_response, term)
elif type == "IpAddress":
host_domains_uri = '/v1/{}/host-domains'.format(term)
# No try block on this call. If no hosted domains, return NoResultException.
host_domains_response = request_domaintools(host_domains_uri, username, password, "limit=20")
## Uncomment these lines for local debugging
##log("domain-tools-ip", "domain tools host domains response: {0}".format(host_domains_response), "debug")
return transform_ip_response(host_domains_response, term)
else:
raise UnsupportedTypeException("Type {} not supported".format(type))
else:
# No auth provided, return link to free community listing
return {
"community_link": {
"url": "https://whois.domaintools.com/{}".format(term),
"name": "DomainTools WHOIS listing"
}
}
#raise MissingCredentialsException()
def request_domaintools(uri, username, key, params):
signer = DTSigner(username, key)
timestamp = signer.timestamp()
signature = signer.sign(timestamp, uri)
host = 'api.domaintools.com'
url = 'http://{0}{1}?api_username={2}&signature={3}×tamp={4}'.format(
host, uri, username, signature, timestamp)
if params != None:
url = "&".join([url,params])
req = urllib2.Request(url)
try:
response = urllib2.urlopen(req)
r = response.read()
return json.loads(r)
except urllib2.HTTPError, err:
if err.code == 403:
raise AuthenticationFailedException()
elif err.code == 404:
raise NoResultException(uri)
else:
raise ExtensionApiException("Got http error {} from domain tools".format(err.code))
def conditional_highlight(value, condition):
if condition == True:
return {"value": value, "style": "highlight"}
else:
return value
def transform_idn_response(profile_response, whois_response, hosting_history_response, term):
extension_content = {}
if "response" in profile_response:
profile_dict = profile_response["response"]
## Removed seo.score per DT review feedback
if "server" in profile_dict:
if "ip_address" in profile_dict["server"]:
extension_content["ip_address"] = profile_dict["server"]["ip_address"]
if "other_domains" in profile_dict["server"]:
other_domains = profile_dict["server"]["other_domains"]
extension_content["ip_other_domains"] = conditional_highlight("{} other domains hosted on same IP".format(other_domains),
(other_domains >= 50))
if "registrant" in profile_dict:
registrant = profile_dict["registrant"]
if "name" in registrant:
extension_content["registrant"] = registrant["name"]
if "domains" in registrant:
extension_content["registrant_domains"] = "Registrant has {} domains".format(registrant["domains"])
if "registration" in profile_dict:
registration = profile_dict["registration"]
if "registrar" in registration:
extension_content["registrar"] = registration["registrar"].lower()
if "created" in registration:
extension_content["created"] = profile_dict["registration"]["created"]
daysago = datetime.now() - datetime.strptime(profile_dict["registration"]["created"],'%Y-%m-%d')
if daysago.days < 30:
extension_content["recently_created"] = {"value": "Created {} days ago".format(daysago.days), "style": "highlight"}
## Removed regigration.expires
## Removed registration.updated per DT review feedback
if "website_data" in profile_dict:
website = {}
if "title" in profile_dict["website_data"]:
website["title"] = profile_dict["website_data"]["title"]
if "response_code" in profile_dict["website_data"]:
website["response_code"] = profile_dict["website_data"]["response_code"]
if "server" in profile_dict["website_data"]:
website["server_type"] = profile_dict["website_data"]["server"]
if len(website) > 0:
extension_content["website"] = website
if "history" in profile_dict:
history = profile_dict["history"]
history_res = {}
if "name_server" in history and "events" in history["name_server"] and history["name_server"][
"events"] > 0 and "timespan_in_years" in history["name_server"]:
history_res["hosting_history"] = "{0} events in {1} years".format(history["name_server"]["events"],
history["name_server"]["timespan_in_years"])
if "ip_address" in history and "events" in history["ip_address"] and history["ip_address"]["events"] > 0 and "timespan_in_years" in history["ip_address"]:
history_res["ip_history"] = "{0} events in {1} years".format(history["ip_address"]["events"],
history["ip_address"]["timespan_in_years"])
if "registrar" in history and "events" in history["registrar"]:
history_res["registrar_history"] = history["registrar"]["events"]
if "whois" in history and "records" in history["whois"] and history["whois"]["records"] > 0 and "earliest_event" in history["whois"]:
history_res["whois_history"] = "{0} records have been archived since {1}".format(history["whois"]["records"],
history["whois"]["earliest_event"])
if len(history_res) > 0:
extension_content["history"] = history_res
if "name_servers" in profile_dict:
extension_content["name_servers"] = get_name_servers(profile_dict["name_servers"])
if whois_response != None and "response" in whois_response:
whois_dict = whois_response["response"]
if "parsed_whois" in whois_dict and "contacts" in whois_dict["parsed_whois"]:
contacts = whois_dict["parsed_whois"]["contacts"]
all_emails = []
for contact in contacts.values():
if contact.get("email"):
all_emails.append(contact["email"])
# Dedup
extension_content["email"] = sorted(list(set(all_emails)))
if "registrant" in contacts:
reg = contacts["registrant"]
reg_address = []
if "name" in reg:
reg_address.append(reg["name"])
if "org" in reg:
reg_address.append(reg["org"])
if "street" in reg:
for s in reg["street"]:
reg_address.append(s)
if "city" in reg:
reg_address.append(reg["city"])
if "state" in reg:
reg_address.append(reg["state"])
if "postal" in reg:
reg_address.append(reg["postal"])
if "country" in reg:
reg_address.append(reg["country"])
if len(reg_address) >0:
extension_content["registrant_address"] = ", ".join(reg_address)
if "parsed_whois" in whois_dict and "name_servers" in whois_dict["parsed_whois"]:
for ns in whois_dict["parsed_whois"]["name_servers"]:
extension_content["name_servers"].append(ns)
# Dedup
extension_content["name_servers"] = sorted(list(set(extension_content["name_servers"])))
if hosting_history_response != None and "response" in hosting_history_response and "ip_history" in hosting_history_response["response"]:
ip_history = []
for h in hosting_history_response["response"]["ip_history"]:
if "post_ip" in h and h["post_ip"] != None and h["post_ip"] != "":
ip_history.append(h["post_ip"])
# Dedup
ip_history = sorted(list(set(ip_history)))
if "history" not in extension_content:
extension_content["history"] = {}
extension_content["history"]["ip_history_details"] = ip_history
if len(extension_content) > 0:
extension_content["screenshot_history"] = { "url": "https://research.domaintools.com/research/screenshot-history/{}/#".format(term),
"name": "Browse screenshot history" }
if len(extension_content) == 0:
log("domain-tools-idn", "No data found", "debug")
raise NoResultException(term)
else:
return json.dumps(extension_content)
def get_name_servers(name_servers):
return [dict["server"].lower() for dict in name_servers if
"server" in dict]
def transform_ip_response(host_domains_response, term):
extension_content = {}
if "response" in host_domains_response and "ip_addresses" in host_domains_response["response"]:
host_dict = host_domains_response["response"]["ip_addresses"]
if "domain_count" in host_dict:
extension_content["host_domain_count"] = host_dict["domain_count"]
if "domain_names" in host_dict:
extension_content["host_domain_names"] = []
for d in host_dict["domain_names"]:
extension_content["host_domain_names"].append(d.lower())
if len(extension_content) == 0:
log("domain-tools-idn", "No data found", "debug")
raise NoResultException(term)
else:
return json.dumps(extension_content)
if __name__ == "__main__":
from recordedfuture_extension_util.extension_util import make_request
print "response:", make_request(request)
Error handling
There are a set of predefined extension exception which should be used in error cases. The exceptions will be handled by the extension api and transformed into error messages displayed in the intelligence card. The available exceptions are:
- MissingCredentialsException()
- Use this if the request-method does not contain the expected or no credential data.
- AuthenticationFailedException()
- Use this if the authentication against the external service fails.
- UnsupportedTypeException(type_string)
- Use this if extension request is made for an unsupported RF metadata type.
- NoResultException(search_term, link (optional))
- Use this if search term produced no result.
- ExtensionApiException(error_message, link (optional))
- Use this for all other error cases.
The exceptions with an optional link argument is to give the user a URL to a page showing how to solve the problem.