Python Source Code

The python source code must define the Extension API method called request which should take two arguments, a request dictionary and an authentication dictionary. It shall return a response in json format where all keys shall be defined in the extension metadata file.

The python source code must run in python version 2.7.

The request dictionary always has the following format: 

{
  "entity":
  {
    "name": "some entity name, ex recordedfuture.com",
    "type": "some entity type, ex InternetDomainName"
  }
}

An extension will only get requests for entity types that it says it supports (read from supported_types field).

The extension source code is responsible for authenticating against the external service using the information provided in the authentication dictionary. If the authentication_fields in extension.json was specified with the keys username and password like in the example above, the authentication dictionary will contain exactly these keys.
 

{
  "username": "user_name_value",
  "password": "password_value"
}


Where user_name_value and password_value will be populated from the data the user has entered in the edit credentials page.

Source code - Example

The following example uses urllib2 but other call methods like Python requests are also supported 

import hashlib
import hmac
import json
import urllib2
from datetime import datetime

from recordedfuture_extension_util.extension_util import *


class DTSigner(object):
    def __init__(self, api_username, api_key):
        self.api_username = api_username.encode("UTF-8")
        self.api_key = api_key.encode("UTF-8")

    @staticmethod
    def timestamp():
        return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

    def sign(self, timestamp, uri):
        params = ''.join([self.api_username, timestamp, uri])
        return hmac.new(self.api_key, params, digestmod=hashlib.sha1).hexdigest()


def request(request_dict, auth_info):
    term = request_dict["entity"]["name"]
    type = request_dict["entity"]["type"]

    if auth_info != None and "username" in auth_info and "password" in auth_info:
        username = auth_info["username"]
        password = auth_info["password"]

        if type == "InternetDomainName":

            profile_uri = '/v1/{0}'.format(term)
            # No try block on this call. If no profile, return NoResultException.
            profile_response = request_domaintools(profile_uri, username, password, None)

            parsed_whois_uri = '/v1/{0}/whois/parsed'.format(term)
            whois_response = None
            try:
                whois_response = request_domaintools(parsed_whois_uri, username, password, None)
            except NoResultException:
                ## Optional, proceed
                pass

            hosting_history_uri = '/v1/{0}/hosting-history'.format(term)
            hosting_history_response = None
            try:
                hosting_history_response = request_domaintools(hosting_history_uri, username, password, None)
            except NoResultException:
                ## Optional, proceed
                pass

            ## Uncomment these lines for local debugging
            ##log("domain-tools-idn", "domain tools profile response: {0}".format(profile_response), "debug")
            ##log("domain-tools-idn", "domain tools parsed whois response: {0}".format(whois_response), "debug")
            ##log("domain-tools-idn", "domain tools hosting history response: {0}".format(hosting_history_response), "debug")

            return transform_idn_response(profile_response, whois_response, hosting_history_response, term)

        elif type == "IpAddress":

            host_domains_uri = '/v1/{}/host-domains'.format(term)
            # No try block on this call. If no hosted domains, return NoResultException.
            host_domains_response = request_domaintools(host_domains_uri, username, password, "limit=20")

            ## Uncomment these lines for local debugging
            ##log("domain-tools-ip", "domain tools host domains response: {0}".format(host_domains_response), "debug")

            return transform_ip_response(host_domains_response, term)

        else:
            raise UnsupportedTypeException("Type {} not supported".format(type))

    else:
        # No auth provided, return link to free community listing
        return {
                "community_link": {
                    "url": "https://whois.domaintools.com/{}".format(term),
                    "name": "DomainTools WHOIS listing"
                }
        }
        #raise MissingCredentialsException()


def request_domaintools(uri, username, key, params):
    signer = DTSigner(username, key)
    timestamp = signer.timestamp()
    signature = signer.sign(timestamp, uri)
    host = 'api.domaintools.com'

    url = 'http://{0}{1}?api_username={2}&signature={3}&timestamp={4}'.format(
            host, uri, username, signature, timestamp)
    if params != None:
        url = "&".join([url,params])

    req = urllib2.Request(url)
    try:
        response = urllib2.urlopen(req)
        r = response.read()
        return json.loads(r)
    except urllib2.HTTPError, err:
        if err.code == 403:
            raise AuthenticationFailedException()
        elif err.code == 404:
            raise NoResultException(uri)
        else:
            raise ExtensionApiException("Got http error {} from domain tools".format(err.code))

def conditional_highlight(value, condition):
    if condition == True:
        return {"value": value, "style": "highlight"}
    else:
        return value

def transform_idn_response(profile_response, whois_response, hosting_history_response, term):
    extension_content = {}
    if "response" in profile_response:
        profile_dict = profile_response["response"]
        ## Removed seo.score per DT review feedback

        if "server" in profile_dict:
            if "ip_address" in profile_dict["server"]:
                extension_content["ip_address"] = profile_dict["server"]["ip_address"]
            if "other_domains" in profile_dict["server"]:
                other_domains = profile_dict["server"]["other_domains"]
                extension_content["ip_other_domains"] = conditional_highlight("{} other domains hosted on same IP".format(other_domains),
                                                                              (other_domains >= 50))

        if "registrant" in profile_dict:
            registrant = profile_dict["registrant"]
            if "name" in registrant:
                extension_content["registrant"] = registrant["name"]
            if "domains" in registrant:
                extension_content["registrant_domains"] = "Registrant has {} domains".format(registrant["domains"])

        if "registration" in profile_dict:
            registration = profile_dict["registration"]
            if "registrar" in registration:
                extension_content["registrar"] = registration["registrar"].lower()

            if "created" in registration:
                extension_content["created"] = profile_dict["registration"]["created"]
                daysago = datetime.now() - datetime.strptime(profile_dict["registration"]["created"],'%Y-%m-%d')
                if daysago.days < 30:
                    extension_content["recently_created"] = {"value": "Created {} days ago".format(daysago.days), "style": "highlight"}
                    ## Removed regigration.expires
                    ## Removed registration.updated per DT review feedback

        if "website_data" in profile_dict:
            website = {}
            if "title" in profile_dict["website_data"]:
                website["title"] = profile_dict["website_data"]["title"]
            if "response_code" in profile_dict["website_data"]:
                website["response_code"] = profile_dict["website_data"]["response_code"]
            if "server" in profile_dict["website_data"]:
                website["server_type"] = profile_dict["website_data"]["server"]
            if len(website) > 0:
                extension_content["website"] = website

        if "history" in profile_dict:
            history = profile_dict["history"]
            history_res = {}
            if "name_server" in history and "events" in history["name_server"] and history["name_server"][
                "events"] > 0 and "timespan_in_years" in history["name_server"]:
                history_res["hosting_history"] = "{0} events in {1} years".format(history["name_server"]["events"],
                                                                                  history["name_server"]["timespan_in_years"])
            if "ip_address" in history and "events" in history["ip_address"] and history["ip_address"]["events"] > 0 and "timespan_in_years" in history["ip_address"]:
                history_res["ip_history"] = "{0} events in {1} years".format(history["ip_address"]["events"],
                                                                             history["ip_address"]["timespan_in_years"])
            if "registrar" in history and "events" in history["registrar"]:
                history_res["registrar_history"] = history["registrar"]["events"]
            if "whois" in history and "records" in history["whois"] and history["whois"]["records"] > 0 and "earliest_event" in history["whois"]:
                history_res["whois_history"] = "{0} records have been archived since {1}".format(history["whois"]["records"],
                                                                                                 history["whois"]["earliest_event"])
            if len(history_res) > 0:
                extension_content["history"] = history_res

        if "name_servers" in profile_dict:
            extension_content["name_servers"] = get_name_servers(profile_dict["name_servers"])

    if whois_response != None and "response" in whois_response:
        whois_dict = whois_response["response"]
        if "parsed_whois" in whois_dict and "contacts" in whois_dict["parsed_whois"]:
            contacts = whois_dict["parsed_whois"]["contacts"]

            all_emails = []
            for contact in contacts.values():
                if contact.get("email"):
                    all_emails.append(contact["email"])
            # Dedup
            extension_content["email"] = sorted(list(set(all_emails)))

            if "registrant" in contacts:
                reg = contacts["registrant"]
                reg_address = []
                if "name" in reg:
                    reg_address.append(reg["name"])
                if "org" in reg:
                    reg_address.append(reg["org"])
                if "street" in reg:
                    for s in reg["street"]:
                        reg_address.append(s)
                if "city" in reg:
                    reg_address.append(reg["city"])
                if "state" in reg:
                    reg_address.append(reg["state"])
                if "postal" in reg:
                    reg_address.append(reg["postal"])
                if "country" in reg:
                    reg_address.append(reg["country"])
            if len(reg_address) >0:
                extension_content["registrant_address"] = ", ".join(reg_address)

        if "parsed_whois" in whois_dict and "name_servers" in whois_dict["parsed_whois"]:
            for ns in whois_dict["parsed_whois"]["name_servers"]:
                extension_content["name_servers"].append(ns)
            # Dedup
            extension_content["name_servers"] = sorted(list(set(extension_content["name_servers"])))

    if hosting_history_response != None and "response" in hosting_history_response and "ip_history" in hosting_history_response["response"]:
        ip_history = []
        for h in hosting_history_response["response"]["ip_history"]:
            if "post_ip" in h and h["post_ip"] != None and h["post_ip"] != "":
                ip_history.append(h["post_ip"])
        # Dedup
        ip_history = sorted(list(set(ip_history)))
        if "history" not in extension_content:
            extension_content["history"] = {}
        extension_content["history"]["ip_history_details"] = ip_history

    if len(extension_content) > 0:
        extension_content["screenshot_history"] = { "url": "https://research.domaintools.com/research/screenshot-history/{}/#".format(term),
                                                    "name": "Browse screenshot history" }

    if len(extension_content) == 0:
        log("domain-tools-idn", "No data found", "debug")
        raise NoResultException(term)
    else:
        return json.dumps(extension_content)

def get_name_servers(name_servers):
    return [dict["server"].lower() for dict in name_servers if
            "server" in dict]

def transform_ip_response(host_domains_response, term):
    extension_content = {}
    if "response" in host_domains_response and "ip_addresses" in host_domains_response["response"]:
        host_dict = host_domains_response["response"]["ip_addresses"]

        if "domain_count" in host_dict:
            extension_content["host_domain_count"] = host_dict["domain_count"]

        if "domain_names" in host_dict:
            extension_content["host_domain_names"] = []
            for d in host_dict["domain_names"]:
                extension_content["host_domain_names"].append(d.lower())

    if len(extension_content) == 0:
        log("domain-tools-idn", "No data found", "debug")
        raise NoResultException(term)
    else:
        return json.dumps(extension_content)

if __name__ == "__main__":
    from recordedfuture_extension_util.extension_util import make_request
    print "response:", make_request(request)

Error handling

There are a set of predefined extension exception which should be used in error cases. The exceptions will be handled by the extension api and transformed into error messages displayed in the intelligence card. The available exceptions are:

  • MissingCredentialsException()
    • Use this if the request-method does not contain the expected or no credential data.
  • AuthenticationFailedException()
    • Use this if the authentication against the external service fails.
  • UnsupportedTypeException(type_string)
    • Use this if extension request is made for an unsupported RF metadata type.
  • NoResultException(search_term, link (optional))
    • Use this if search term produced no result.
  • ExtensionApiException(error_message, link (optional))
    • Use this for all other error cases.

The exceptions with an optional link argument is to give the user a URL to a page showing how to solve the problem.

This content is confidential. Do not distribute or download content in a manner that violates your Recorded Future license agreement. Sharing this content outside of licensed Recorded Future users constitutes a breach of the terms and/or agreement and shall be considered a breach by your organization.
Was this article helpful?
0 out of 0 found this helpful

Articles in this section