Python 情報源コード

Python 情報ソース コードは、リクエスト ディクショナリ認証ディクショナリの2 つの引数を取る、 request と呼ばれる拡張 API メソッドを定義する必要があります。 すべてのキーが拡張メタデータ ファイルで定義されている json 形式で応答を返します。

Python 情報ソース コードは、Python バージョン 2.7 で実行する必要があります。

リクエスト辞書は常に次の形式になります。

{
 「エンティティ」:
 {
    "name": "some entity name, ex recordedfuture.com",
    "type": "some entity type, ex InternetDomainName"
  }
}

拡張機能は、サポートしているとされるエンティティ タイプ ( supported_typesフィールドから読み取り) のリクエストのみを取得します。

拡張情報ソース コードは、認証辞書で提供される情報を使用して外部サービスに対して認証を行う役割を担います。 上記の例のように、extension.json のauthentication_fields がusername と password のキーで指定されている場合、認証辞書にはこれらのキーが正確に含まれます。

{
  "username": "user_name_value",
  "password": "password_value"
}


ここで、user_name_value と password_value は、ユーザーが認証情報編集ページで入力したデータから入力されます。

情報ソースコード - 例

次の例ではurllib2を使用していますが、Pythonリクエストなどの他の呼び出し方法もサポートされています。

ハッシュライブラリをインポートする
hmacをインポートする
JSONをインポート
urllib2をインポートする
datetimeからdatetimeをインポート

recordsedfuture_extension_util.extension_utilからインポート*


クラス DTSigner(オブジェクト):
定義__init__ (self, api_username, api_key):
 self.api_ユーザー名= api_username.encode("UTF-8")自己.api_key= api_key.encode("UTF-8")@静的メソッド
タイムスタンプ()の定義:
 datetime.utcnow().strftime(' %Y-% m- %dT% H: %M:% SZ') を返しますdef sign(self, タイムスタンプ, uri):
パラメータ = ''.join([self.api_username,タイムスタンプ、uri])
 hmac.new(self.api_key, を返す)パラメータ、digestmod=hashlib.sha1).hexdigest()リクエストの定義(request_dict、auth_info):
 term = request_dict["エンティティ"]["名前"]
 type = request_dict["エンティティ"]["タイプ"]

 auth_info != の場合なし、auth_info に「username」、auth_info に「password」:
 username = auth_info["ユーザー名"]
パスワード = auth_info["パスワード"]

タイプ == "InternetDomainName" の場合:

 profile_uri = '/v1/ {0} '.format(term)# この呼び出しには try ブロックはありません。プロファイルがない場合は、NoResultException を返します。profile_response = request_domaintools(profile_uri, ユーザー名, パスワード, なし)

 parsed_whois_uri = '/v1/ {0} /whois/parsed'.format(term)whois_response = なし
試す:
 whois_response = request_domaintools(parsed_whois_uri、ユーザー名、パスワード、なし)
 NoResultExceptionを除く:
 ## Optional, proceed
                pass

            hosting_history_uri = '/v1/{0}/hosting-history'.format(term)
            hosting_history_response = None
            try:
                hosting_history_response = request_domaintools(hosting_history_uri, username, password, None)
            except NoResultException:
                ##オプション、続行
合格

## Uncomment these lines for local debugging
            ## log("domain-tools-idn", "ドメイン ツール プロファイルの応答: {0} ".format(profile_response), "デバッグ")
 ##log("domain-tools-idn", "domain tools parsed whois response: {0}".format(whois_response), "debug")
            ## log("domain-tools-idn", "ドメイン ツール ホスティング履歴の応答: {0} ".format(hosting_history_response), "デバッグ")

 transform_idn_response(profile_response, whois_response, hosting_history_response, term) を返します。

 elif type == "IpAddress":

 host_domains_uri = '/v1/{}/host-domains'.format(term)
 # この呼び出しには try ブロックはありません。ホストされているドメインがない場合、NoResultException を返します。host_domains_response = request_domaintools(host_domains_uri, ユーザー名, パスワード, "limit=20")

 ## Uncomment these lines for local debugging
            ## log("domain-tools-ip", "ドメイン ツール ホスト ドメインの応答: {0} ".format(host_domains_response), "デバッグ")

 transform_ip_response(host_domains_response, term) を返します

それ以外:
 UnsupportedTypeException("タイプ {} はサポートされていません".format(type)) が発生しますそれ以外:
 # 認証が提供されていません。無料のコミュニティリストへのリンクを返します
戻る {
 "コミュニティリンク": {
 "url": "https://whois.domaintools.com/{}".format(term),"name": "DomainTools WHOIS リスト"
 }
 }
 #MissingCredentialsException() を発生させる


def request_domaintools(uri, ユーザー名, キー, パラメータ):
署名者 = DTSigner(ユーザー名, キー)
タイムスタンプ = signer.timestamp()署名 = 署名者.sign(タイムスタンプ,ウリ)
ホスト = 'api.domaintools.com'url = 'http:// {0} {1} ?api_username= {2} &signature= {3} &timestamp= {4} '.format(ホスト、URI、ユーザー名、署名、タイムスタンプ)
パラメータ!= Noneの場合:
 url = "&".join([url,params])req = urllib2.Request(url)
試す:
応答 = urllib2.urlopen(req)
        r = レスポンス.read()json.loads(r) を返すurllib2.HTTPError を除く、エラー:
 err.code == 403の場合:
 AuthenticationFailedException() を発生させる
elif err.code == 404:
 NoResultException(uri) を発生させる
それ以外:
 ExtensionApiException を発生させます("ドメイン ツールから http エラー {} が発生しました".format(err.code))def conditional_highlight(値, 条件):
条件 == True の場合:
戻り値{"value": value, "style": "highlight"}
それ以外:
戻り値

def transform_idn_response(profile_response、whois_response、hosting_history_response、term):
 extension_content = {}
 profile_response に「応答」がある場合:
 profile_dict = profile_response["応答"]
 ## Removed seo.score per DT review feedback

        if "server" in profile_dict:
            if "ip_address" in profile_dict["server"]:
                extension_content["ip_address"] = profile_dict["server"]["ip_address"]
            if "other_domains" in profile_dict["server"]:
                other_domains = profile_dict["server"]["other_domains"]
                extension_content["ip_other_domains"] = conditional_highlight("{} other domains hosted on same IP".format(other_domains),
                                                                              (other_domains >= 50))

        if "registrant" in profile_dict:
            registrant = profile_dict["registrant"]
            if "name" in registrant:
                extension_content["registrant"] = registrant["name"]
            if "domains" in registrant:
                extension_content["registrant_domains"] = "Registrant has {} domains".format(registrant["domains"])

        if "registration" in profile_dict:
            registration = profile_dict["registration"]
            if "registrar" in registration:
                extension_content["registrar"] = registration["registrar"].lower()

            if "created" in registration:
                extension_content["created"] = profile_dict["registration"]["created"]
                daysago = datetime.now() - datetime.strptime(profile_dict["registration"]["created"],'%Y-%m-%d')
                if daysago.days < 30:
                    extension_content["recently_created"] = {"value": "Created {} days ago".format(daysago.days), "style": "highlight"}
                    ## regigration.expires を削除しました
## 登録を削除しました。DT レビューのフィードバックに従って更新しました

profile_dictに「website_data」がある場合:
ウェブサイト = {}
 profile_dict["website_data"]に"title"がある場合:
 website["title"] = profile_dict["website_data"]["title"]
 profile_dict["website_data"]に"response_code"がある場合:
 website["response_code"] = profile_dict["website_data"]["response_code"]
 profile_dict["website_data"]に"server"がある場合:
 website["server_type"] = profile_dict["website_data"]["server"]
 len(ウェブサイト) > 0の場合:
 extension_content["ウェブサイト"] = ウェブサイト

profile_dictに「history」がある場合:
 history = profile_dict["履歴"]
 history_res = {}
履歴に「name_server」があり、history["name_server"] と history["name_server"][ に「events」がある場合
history["name_server"]の"events"] > 0かつ"timespan_in_years":
 history_res["hosting_history"] = " {1}年間で{0}イベント".format(history["name_server"]["events"],
                                                                                  history["ネームサーバー"]["期間(年数)"])
履歴に「ip_address」があり、履歴["ip_address"]に「events」があり、履歴["ip_address"]["events"] > 0 であり、履歴["ip_address"]に「timespan_in_years」がある場合:
 history_res["ip_history"] = " {1}年間で{0}イベント".format(history["ip_address"]["events"],history["ip_address"]["timespan_in_years"])
履歴に「registrar」があり、履歴に「events」がある場合["registrar"]:
 history_res["registrar_history"] = history["レジストラ"]["イベント"]
履歴に「whois」があり、history["whois"]に「records」があり、history["whois"]["records"] > 0 であり、history["whois"]に「earliest_event」がある場合:
 history_res["whois_history"] = " {1}以降に{0}レコードがアーカイブされました ".format(history["whois"]["records"],履歴["whois"]["earliest_event"])
 len(history_res) > 0の場合:
 extension_content["history"] = history_res

 profile_dictに「name_servers」がある場合:
 extension_content["name_servers"] = get_name_servers(profile_dict["name_servers"])

 whois_response != の場合なし、whois_response の「応答」:
 whois_dict = whois_response["応答"]
 whois_dictに「parsed_whois」があり、whois_dict["parsed_whois"]に「contacts」がある場合:
連絡先 = whois_dict["parsed_whois"]["連絡先"]

すべてのメール = []
 contacts.values() 内の連絡先の場合:
                contact.get("email") の場合:all_emails.append(連絡先["メールアドレス"])# 重複除去
extension_content["email"] = sorted(list(set(all_emails)))

連絡先に「登録者」が含まれている場合:
 reg = 連絡先["登録者"]
登録アドレス = []
 regに「name」がある場合:
 reg_address.append(reg["名前"])regに「org」がある場合:
 reg_address.append(reg["org"])regに「street」がある場合:
 reg["street"]のsの場合:
 reg_address.append(s)regに「city」がある場合:
 reg_address.append(reg["city"])reg に "state" がある場合:
 reg_address.append(reg["state"])regに「postal」が含まれている場合:
 reg_address.append(reg["郵便"])regに「country」がある場合:
 reg_address.append(reg["国"])len(reg_address) >0の場合:
 extension_content["登録者アドレス"] = ", ".join(登録者アドレス)whois_dictに「parsed_whois」があり、whois_dict["parsed_whois"]に「name_servers」がある場合:
 whois_dict["parsed_whois"]["name_servers"]内のnsについて:
 extension_content["name_servers"].append(ns)# 重複除去
extension_content["name_servers"] = sorted(list(set(extension_content["name_servers"])))

 hosting_history_response != の場合なし、hosting_history_response の「応答」および hosting_history_response["応答"] の「ip_history」:
 ip_history = []
 hosting_history_response["応答"]["ip_history"] の h の場合:
 h に "post_ip" があり、h["post_ip"] != の場合 なし、かつ h["post_ip"] != "":
 ip_history.append(h["post_ip"])# 重複除去
ip_history = ソート済み(リスト(セット(ip_history)))
 extension_contentに「history」がない場合:
 extension_content["history"] = {}
 extension_content["history"]["ip_history_details"] = ip_history

 len(extension_content) > 0の場合:
 extension_content["スクリーンショット履歴"] = { "url": "https://research.domaintools.com/research/screenshot-history/{}/#".format(term),"name": "スクリーンショット履歴を閲覧する" }

 len(extension_content) == 0 の場合:
 log("domain-tools-idn", "データが見つかりません", "debug")
 NoResultException(term) を発生させる
それ以外:
 json.dumps(extension_content) を返すdef get_name_servers(name_servers):
 name_serversのdictに対して[dict["server"].lower()を返す
辞書内の「サーバー」

 def transform_ip_response(host_domains_response, 用語):
 extension_content = {}
 host_domains_response に「応答」があり、host_domains_response に「ip_addresses」がある場合["応答"]:
 host_dict = host_domains_response["応答"]["ip_addresses"]

 host_dictに「domain_count」がある場合:
 extension_content["ホストドメイン数"] = host_dict["ドメイン数"]

 host_dictに「domain_names」がある場合:
 extension_content["ホストドメイン名"] = []
 host_dict["domain_names"]内のdについて:
 extension_content["ホストドメイン名"].append(d.lower())

    len(extension_content) == 0 の場合:
 log("domain-tools-idn", "データが見つかりません", "debug")
 NoResultException(term) を発生させる
それ以外:
 json.dumps(extension_content) を返す

__name__ == " __main__ "の場合:
 recordsedfuture_extension_util.extension_utilからmake_requestをインポート
print "応答:", make_request(request)

エラー処理

エラーの場合に使用する必要のある定義済みの拡張例外のセットがあります。例外は拡張 API によって処理され、インテリジェンス カードに表示されるエラー メッセージに変換されます。利用可能な例外は次のとおりです。

  • 資格情報不足例外()
    • リクエストメソッドに予期された認証情報データが含まれていない場合、または認証情報データがない場合にこれを使用します。
  • 認証失敗例外()
    • 外部サービスに対する認証が失敗した場合にこれを使用します。
  • サポートされていないTypeException(type_string)
    • サポートされていない RF メタデータ タイプに対して拡張要求が行われた場合は、これを使用します。
  • NoResultException(検索語、リンク(オプション))
    • 検索語で結果が得られなかった場合にこれを使用します。
  • ExtensionApiException(error_message, リンク (オプション))
    • 他のすべてのエラーの場合にこれを使用します。

オプションのリンク引数の例外は、問題を解決する方法を示すページへの URL をユーザーに提供することです。

This content is confidential. Do not distribute or download content in a manner that violates your Recorded Future license agreement. Sharing this content outside of licensed Recorded Future users constitutes a breach of the terms and/or agreement and shall be considered a breach by your organization.
この記事は役に立ちましたか?
0人中0人がこの記事が役に立ったと言っています

このセクションの記事