____ _ _ _
| _ \ __ _| |_ __ _| | __ _| | _____
| | | |/ _` | __/ _` | |/ _` | |/ / _ \
| |_| | (_| | || (_| | | (_| | < __/
|____/ \__,_|\__\__,_|_|\__,_|_|\_\___|
Using this library instead of making your own calls to the API has multiple advantages:
- It allows you to get started more quickly than by using the API directly.
- It is maintained directly by Datalake developers, thus reducing the burden of keeping it compatible with the API over time.
- It is open-source, so you can reuse the functionalities developed by other Datalake users as well as helps improve this package further yourself.
The following will guide you through the configuration of your environment and most common use cases.
With Python 3.6+:
$ pip install datalake-scripts
or
$ pip3 install datalake-scriptsFirst of all, you will need to create a Datalake instance, which can then be reused to make subsequent calls:
from datalake import Datalake
# Sign in using username/password
dtl = Datalake(longterm_token='longterm_token', env='prod')
# or
# Sign in using longterm_token
dtl = Datalake(username='username', password='password', env='preprod')While sign in using longterm tokens is to be preferred for security reasons, some endpoints require fresh tokens, i.e. a Datalake instance initiated with username and password. These endpoints will mention it in their description in the API documentation.
Alternatively, you may use environment variables as credentials (they will only be used if you do not set the args when creating the instance):
OCD_DTL_LONGTERM_TOKENas an alternative tolongterm_token. If set, the username and password environment variables below will be ignored.OCD_DTL_USERNAMEandOCD_DTL_PASSWORDas an alternative tousernameandpassword, respectively. When omitted, the user will be prompted to enter them at runtime.
The table below lists all the available parameters, along with a brief explanation.
| Parameter | Env variable | Description |
|---|---|---|
longterm_token |
OCD_DTL_LONGTERM_TOKEN |
Token to use when connecting to Datalake API. See above. |
username |
OCD_DTL_USERNAME |
Username to use when connecting to Datalake API. See above. |
password |
OCD_DTL_PASSWORD |
Password to use when connecting to Datalake API. See above. |
env |
- |
Datalake environment to use. Allowed values are prod and preprod. Defaults to prod. |
verify |
- |
Whether to verify SSL certificate when connecting to Datalake API. Defaults to True. |
proxies |
HTTP_PROXY and HTTPS_PROXY |
Configure to use an HTTP/HTTPS proxy. See dedicated section below. |
You may specify an HTTP/HTTPS proxy to use when connecting to the Datalake API by setting the proxies parameter. We are using the format accepted by the "requests" python library. See its documentation for other possible kinds of proxy to set up.
proxies = {
'http': 'http://127.0.0.1:8080',
'https': 'http://127.0.0.1:8080',
}
dtl = Datalake(longterm_token='longterm_token', proxies=proxies, verify=False)As stated in the "requests" documentation, environment variables such as HTTP_PROXY and HTTPS_PROXY may be used instead. By default, no proxy is configured.
Below are some examples to get you started
- Lookup a threat
- Get threats
- Atom values
- Bulk lookup
- Bulk search
- Add a threat (with all details)
- Bulk add threats at once (atom values only)
- Add tags
- Add comments
- Edit score
- Advanced Search
- Sources
- Sightings
- Search Sightings
- Search Watch
- My Account
For more information on the API endpoints see the API documentation
dtl.Threats.lookup(
atom_value='mayoclinic.org', # Mandatory
atom_type=AtomType.DOMAIN,
hashkey_only=False,
output=Output.JSON
)The following Output formats are available:
- JSON (Default)
- CSV
- MISP
- STIX
You can retrieve threats with a list of hashkeys
dtl.Threats.get_threats_with_comments(
hashkeys=['00000001655688982ec8ba4058f02dd1'],
)It is possible to get all atom values from a list of source ids and a time range.
Mandatory parameters are:
source_idnormalized_timestamp_sincenormalized_timestamp_until
Optional parameters are:
outputoutput_path, default isNone, so no output file is created
dtl.Threats.atom_values(
source_id=["a","b"],
normalized_timestamp_since="2023-09-14T15:00:00.000Z",
normalized_timestamp_until="2023-09-15T15:00:00.000Z",
output=Output.JSON,
output_path="atom_values.json"
)The following Output formats are available:
- JSON (Default)
- CSV
Compared to lookup, the bulk_lookup method allows to lookup big batches of values faster as fewer API calls are made. However, fewer outputs types are supported (only json and csv as of now).
threats = [
'mayoclinic.org',
'commentcamarche.net',
'gawker.com'
]
dtl.Threats.bulk_lookup(
atom_values=threats,
atom_type=AtomType.DOMAIN,
hashkey_only=False,
output=Output.CSV,
return_search_hashkey=False
)The following Output formats are available:
- JSON (Default)
- CSV
The create_task method makes it possible to run a bulk search based on a given query hash. A convenient download_sync method is provided to store the result as a file:
task = dtl.BulkSearch.create_task(query_hash='<some query hash>')
csv = task.download_sync(output=Output.CSV)The following Output format are available:
- JSON
- JSON_ZIP
- CSV
- CSV_ZIP
- STIX_ZIP
The STIX_ZIP format is only available if when creating the task it is specified that it is for stix export, using the for_stix_export parameter. When using the STIX_ZIP format, the API will return a zip file containing json files with a maximum of 5000 threats per file.
With STIX exports, you might also use the following options to filter the result:
indicators_only: Exports only indicators.indicators_and_threat_entities_only: Exports only indicators and threat entities.- None of these: Exports indicators, threat entities and sightings.
task = dtl.BulkSearch.create_task(for_stix_export=True, indicators_only=True, query_hash='<some query hash>')
stix = task.download_sync_stream_to_file(output=Output.STIX_ZIP, output_path="stix_export.zip")Note
download_syncaccepts astream=Trueparameter that if passed change the return of the function. It is no longer the plain response body but theResponseobject from therequestslibrary. This allow to retrieve the plain body as a stream.task.download_sync_stream_to_file('<absolute output path>', output=Output.JSON)is a helper function that do just that, storing the output in a file while keeping the RAM usage low and independent of the size of the bulksearch result.
Depending of your use case, you can call an async version to parallelize the wait of bulk search for example:
import asyncio
from datalake import Datalake, Output
dtl = Datalake(username='username', password='password')
# Queuing multiple bulk searches at once saves a lot of time
# However you will receive HTTP 400 error if you try to enqueue too many bulk search at once (more than 10)
query_hashes_to_process = [
'7018d41944b71b04a9d3785b3741c842',
'207d02c81edde3c87f665451f04f9bd1',
'9f7a8fecb0a74e508d6873c4d6e0d614',
'8bd8f1b47ce1a76ac2a1dc9e91aa9a5e',
'd3f8e2006554aaffa554714c614acd30',
]
coroutines = []
for query_hash in query_hashes_to_process:
task = dtl.BulkSearch.create_task(query_hash=query_hash)
coroutines.append(task.download_async(output=Output.JSON))
loop = asyncio.get_event_loop()
future = asyncio.gather(*coroutines)
results = loop.run_until_complete(future)
result_per_query_hash = {} # Since results keep its order, we can easily attach back query_hash to its result
for query_hash, result in zip(query_hashes_to_process, results):
result_per_query_hash[query_hash] = result
print(result_per_query_hash)
# will output:
{
"query_hash_1": {"result of query_hash 1"},
"query_hash_2": {"result of query_hash 2"},
...
}You can call the add_threat function to add a single threat at a time and retrieve details from the newly submitted threat.
from datalake import Datalake
from datalake import ThreatType, OverrideType, IpAtom, EmailAtom, FileAtom, Hashes, EmailFlow, IpService
dtl = Datalake(username='username', password='password')
# Example 1: Adding a "File" threat corresponding to an empty file
hashes = Hashes(
md5='d41d8cd98f00b204e9800998ecf8427e',
sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709',
sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
)
empty_file = FileAtom(
hashes=hashes,
filesize=0,
filetype='txt',
filename=['empty.txt'],
external_analysis_link=['https://www.computerhope.com/issues/ch001314.htm']
)
dtl.Threats.add_threat(
atom=empty_file,
threat_types=[{'threat_type': ThreatType.MALWARE, 'score': 0}],
override_type=OverrideType.TEMPORARY,
public=True,
tags=['empty_file']
)
# Example 2: Adding an "IP" threat corresponding to Google DNS IP
dns_service = IpService(
port=53,
service_name='dns',
application='dns',
protocol='udp'
)
google_dns_ip = IpAtom(
ip_address='8.8.8.8',
external_analysis_link=['https://www.virustotal.com/gui/ip-address/8.8.8.8'],
ip_version=4,
services=[dns_service],
owner='Google'
)
dtl.Threats.add_threat(
atom=google_dns_ip,
threat_types=[{'threat_type': ThreatType.MALWARE, 'score': 0}],
override_type=OverrideType.TEMPORARY,
public=True,
tags=['google_dns']
)
# Example 3: Adding an "Email" threat corresponding to noreply OCD email
my_email = EmailAtom(
email='noreply@orangecyberdefense.com',
email_flow=EmailFlow.FROM,
external_analysis_link=['https://www.orangecyberdefense.com']
)
dtl.Threats.add_threat(
atom=my_email,
threat_types=[{'threat_type': ThreatType.SPAM, 'score': 0}],
override_type=OverrideType.TEMPORARY,
whitelist=True,
public=True,
tags=['ocd']
)The following positional arguments are required:
atom: an instance of an Atom class, for exampleIpAtom
The following keyword arguments are available:
threat_types: A list of dictionaries containing a key namedthreat_typewith aThreatTypevalue and a key namedscorewith an integer value between 0 and 100. Available ThreatType options are: DDOS, FRAUD, HACK, LEAK, MALWARE, PHISHING, SCAM, SCAN, SPAM. Defaults toNone.override_type: an OverrideType. Available options are:TEMPORARY: All values should override any values provided by older IOCs, but not newer ones.LOCK: All values will override any values provided by both newer and older IOCs for three months. Newer IOCs with override_type lock can still override old lock changes. Will act like a temporary after three months.
whitelist: A boolean, if nothreat_typesare provided, this argument should be set to true. All score values will then be set to 0. Ifthreat_typesare provided along withwhitelistset asTrue, will result in an error. Defaults toFalse.public: A boolean, sets whether the threats should be public or private. Defaults toTrue.tags: a List of strings. Will set the tags of the added threat(s).
You can call the add_threats function to add multiple threats at once. They
atom_list = ['12.34.56.78', '9.8.7.6']
threat_types = [{'threat_type': ThreatType.DDOS, 'score': 20}]
dtl.Threats.add_threats(
atom_list,
AtomType.IP,
threat_types,
OverrideType.TEMPORARY,
external_analysis_link=['https://someurl.com'],
tags=['some_tag'],
public=False
)The following positional arguments are required:
atom_list: a List of strings. Contains the list of threats to add. In our example it's a list of IPs.atom_type: an AtomType. Available options are: APK, AS, CC, CRYPTO, CVE, DOMAIN, EMAIL, FILE, FQDN, IBAN, IP, IP_RANGE, PATE, PHONE_NUMBER, REGKEY, SSL, URL
The following keyword arguments are available:
threat_types: A list of dictionaries containing a key namedthreat_typewith aThreatTypevalue and a key namedscorewith an integer value between 0 and 100. Available ThreatType options are: DDOS, FRAUD, HACK, LEAK, MALWARE, PHISHING, SCAM, SCAN, SPAM. Defaults toNone.override_type: an OverrideType. Available options are:TEMPORARY: All values should override any values provided by older IOCs, but not newer ones.LOCK: All values will override any values provided by both newer and older IOCs for three months. Newer IOCs with override_type lock can still override old lock changes. Will act like a temporary after three months.
whitelist: A boolean, if nothreat_typesare provided, this argument should be set to true. All score values will then be set to 0. Ifthreat_typesare provided along withwhitelistset asTrue, will result in an error. Defaults toFalse.public: A boolean, sets whether the threats should be public or private. Defaults toTrue.tags: a List of strings. Will set the tags of the added threat(s).external_analysis_link: a List of strings. A link to an external resource providing more information about the threat.
A quick and easy way to add tags to a threat
hashkey = '00000001655688982ec8ba4058f02dd1'
tags = ['green', 'white']
public = False
dtl.Tags.add_to_threat(hashkey, tags, public)A quick and easy way to add a comment to multiple threats
hashkeys = ['00000001655688982ec8ba4058f02dd1']
comment = "some comment"
public = False
dtl.Comments.post_comments(hashkeys, comment, public)It will return two lists, hashkeys of threats correctly updated and a list for the other ones not updated with the comment.
Filtered & Sorted Threat Entities List Retrieval Tutorial
import json
from datalake import Datalake
dtl = Datalake(username='<username>', password='<password>')
threat_entities_list = dtl.FilteredThreatEntity.get_filtered_and_sorted_list()
# To print to console
print(json.dumps(threat_entities_list, indent=4))
# To write to a file
with open('output.json', 'w') as f:
json.dump(threat_entities_list, f, indent=4)You can use functions parameters to filter and/or sort the results.
Mutliple threats can be edited at once, each threat type independently:
from datalake import Datalake, ThreatType, OverrideType
dtl = Datalake(username='username', password='password')
hashkeys = [
'00000001655688982ec8ba4058f02dd1',
'00000001655688982ec8ba4058f02dd2',
]
threat_scores_list = [
{'threat_type': ThreatType.DDOS, 'score': 5},
{'threat_type': ThreatType.PHISHING, 'score': 25},
]
override_type = OverrideType.TEMPORARY
dtl.Threats.edit_score_by_hashkeys(hashkeys, threat_scores_list, override_type)Query hashes can also be used with another function provided for that use:
from datalake import Datalake, ThreatType, OverrideType
dtl = Datalake(username='username', password='password')
query_body_hash = '7018d41944b71b04a9d3785b3741c842'
threat_scores_list = [
{'threat_type': ThreatType.DDOS, 'score': 5},
{'threat_type': ThreatType.PHISHING, 'score': 25},
]
override_type = OverrideType.TEMPORARY
dtl.Threats.edit_score_by_query_body_hash(query_body_hash, threat_scores_list, override_type)The library can be used to execute advanced search if you have a query hash or a query to body,
using advanced_search_from_query_hash or advanced_search_from_query_body .
from datalake import Datalake, Output
dtl = Datalake(username='username', password='password')
query_body = {
"AND": [
{
"AND": [
{
"field": "atom_type",
"multi_values": [
"ip"
],
"type": "filter"
},
{
"field": "risk",
"range": {
"gt": 60
},
"type": "filter"
}
]
}
]
}
query_hash = 'cece3117abc823cee81e69c2143e6268'
adv_search_hash_resp = dtl.AdvancedSearch.advanced_search_from_query_hash(query_hash, limit=20, offset=0,
ordering=['first_seen'], output=Output.JSON)
adv_search_body_resp = dtl.AdvancedSearch.advanced_search_from_query_body(query_body, limit=20, offset=0,
ordering=['-first_seen'], output=Output.JSON)You can check if a provided list of sources (their ids) exist in Datalake. Return two elements:
- a boolean set to True if all sources exists in Datalake, False otherwise
- a list containing the non-existing sources ids
from datalake import Datalake
dtl = Datalake(username='username', password='password')
bool_all_sources_exists, invalid_sources = dtl.Sources.check_sources(["source_a","source_b"])Sightings can be submitted in bulk using the library using a list of atoms. Each sighting can contain several atoms or hashkeys. Within the same sighting, they will share properties.
First, create a list of dict of sightings to submit and then submit this list. This is the preferred method (to avoid being rate-limited)
from datalake import Datalake, IpAtom, EmailAtom, UrlAtom, FileAtom, Hashes, SightingType, Visibility, ThreatType
import datetime
dtl = Datalake(username='username', password='password')
# 1) Create the list of sightings
# sighting 1
# Prepare atoms for which we want to generate a sighting
f1 = FileAtom(hashes=Hashes(
md5='d41d8cd98f00b204e9800998ecf8427e',
sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709',
sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
))
ip1 = IpAtom('52.48.79.33')
em1 = EmailAtom('hacker@hacker.fr')
url1 = UrlAtom('http://notfishing.com')
# You also need to define additional properties for these atoms, shared by all atoms within the same sighting.
## Prepare threat types for this sighting
threat_types = [ThreatType.PHISHING, ThreatType.SCAM]
## Prepare start and end timestamps for this sighting
start = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
end = datetime.datetime.now(datetime.timezone.utc)
sighting_1 = {
"atoms":[ip1, f1, em1, url1],
"start_timestamp":start,
"end_timestamp":end,
"sighting_type":SightingType.POSITIVE,
"description_visibility":Visibility.PUBLIC,
"count":1,
"threat_types":threat_types,
"tags":['some_tag'],
"description":'some_description',
"editable":True
}
# sighting 2
# Prepare hashkeys for which we want to generate a sighting
hashkeys = [
"d41d8cd98f00b204e9800998ecf8427e",
"a7b25b324871a7695aa2cc5d09681dda",
"1ed07771327e850255b09b042ad00e3d",
"bf1f33c3a56e1dfda6a2f4f3d3e4361a"
]
# You also need to define additional properties for these hashkeys, shared by all hashkeys within the same sighting.
## Prepare threat types for this sighting
threat_types = [ThreatType.PHISHING, ThreatType.SCAM]
## Prepare start and end timestamps for this sighting
start = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=4)
end = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2)
sighting_2 = {
"hashkeys":hashkeys,
"start_timestamp":start,
"end_timestamp":end,
"sighting_type":SightingType.NEUTRAL,
"description_visibility":Visibility.PUBLIC,
"count":3,
"threat_types":threat_types,
"tags":['some_tag_bis'],
"description":'some_description_bis',
"editable":False
}
# Create the list of sightings
list_sightings = [sighting_1,sighting_2]
# 2) Submit sightings in bulk
dtl.Sightings.bulk_submit_sightings(
sightings=list_sightings
)from datalake import Datalake, IpAtom, EmailAtom, UrlAtom, FileAtom, Hashes, SightingType, Visibility, ThreatType
import datetime
dtl = Datalake(username='username', password='password')
# Prepare atoms for which we want to generate a sighting
f1 = FileAtom(hashes=Hashes(
md5='d41d8cd98f00b204e9800998ecf8427e',
sha1='da39a3ee5e6b4b0d3255bfef95601890afd80709',
sha256='e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
))
ip1 = IpAtom('52.48.79.33')
em1 = EmailAtom('hacker@hacker.fr')
url1 = UrlAtom('http://notfishing.com')
# You also need to define additional properties for these atoms, shared by all atoms within the same sighting.
## Prepare threat types for this sighting
threat_types = [ThreatType.PHISHING, ThreatType.SCAM]
## Prepare start and end timestamps for this sighting
start = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
end = datetime.datetime.now(datetime.timezone.utc)
# Submit the atoms in a single sighting
dtl.Sightings.submit_sighting(
atoms=[ip1, f1, em1, url1],
start_timestamp=start,
end_timestamp=end,
sighting_type=SightingType.POSITIVE,
description_visibility=Visibility.PUBLIC,
count=1,
threat_types=threat_types,
tags=['some_tag'],
description='some_description',
editable=True
)Alternatively, the same can also be achieved using a list of hashkeys:
from datalake import Datalake, SightingType, Visibility, ThreatType
import datetime
dtl = Datalake(username='username', password='password')
# Prepare hashkeys for which we want to generate a sighting
hashkeys = [
"d41d8cd98f00b204e9800998ecf8427e",
"a7b25b324871a7695aa2cc5d09681dda",
"1ed07771327e850255b09b042ad00e3d",
"bf1f33c3a56e1dfda6a2f4f3d3e4361a"
]
# You also need to define additional properties for these hashkeys, shared by all hashkeys within the same sighting.
## Prepare threat types for this sighting
threat_types = [ThreatType.PHISHING, ThreatType.SCAM]
## Prepare start and end timestamps for this sighting
start = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
end = datetime.datetime.now(datetime.timezone.utc)
# Submit the hashkeys in a single sighting
dtl.Sightings.submit_sighting(
hashkeys=hashkeys,
start_timestamp=start,
end_timestamp=end,
sighting_type=SightingType.POSITIVE,
description_visibility=Visibility.PUBLIC,
count=1,
threat_types=threat_types,
tags=['some_tag'],
description='some_description',
editable=True
)The atom_type file provides multiple classes to build each type of atom type used by the API. The classes will provide you with hints on the value expected for each atom_type, most of which aren't mandatory. For sightings, we won't use most of the fields. You can verify the fields that are used for sightings in the docstrings of each class, inside your editor.
It is possible to search sightings from either a hashkey or an atom_value. All parameters are optional.
From a hashkey:
dtl.Sightings.sightings_filtered(
"f39cbce3c4d30d61ccdc99c5fcb3bf6f",
limit=100,
offset=0,
sighting_type=SightingType.POSITIVE,
description_visibility=Visibility.PUBLIC
)From an atom value:
dtl.Sightings.sightings_filtered_from_atom_value(
"8.8.8.8",
limit=100,
offset=0,
sighting_type=SightingType.POSITIVE,
description_visibility=Visibility.PUBLIC
)See the API documentation below for a list of available options.
It is possible to monitor (watch) a search to find new iocs that match your search criteria, through the search_watch method.
dtl.SearchWatch.search_watch(
query_body={<some query body>},
output_folder='<some/folder/path/>',
reference_file='<full/file/path/to/compare/with>'
)It can take either a query_body or a query_hash as required input. And as optional inputs:
- output_folder: is the folder where all the results json files of the bulk search will be stored. By default it takes the current folder.
- reference_file: is the full path of the file which will serve as reference for the comparison with the new bulk search result. That file needs to be in the format
<query_hash>-<timestamp>.json. By default the most recent generated file in the output folder is taken as reference_file. - save_diff_threats: when define as True, the results of the search_watch method are stored in a json file
<queryhashkey>-diff_threats-<timestamp>.jsoncontaining added and removed threats, within the output_folder that was define. By default it is define as False.
search_watch method returns a dict(JSON) as follow
{
'from': '2023-11-01 15:14:00',
'to': '2023-11-07 10:22:00',
'added':
{
(
'hashkey': '33565c33a858888c78704fea22669707',
'atom_value': '8.8.8.8'
)
},
'removed':
{
(
'hashkey': '13565b07b857777d08714ffd22003403',
'atom_value': '1.1.1.1'
)
}
}
You can retrieve information to the current user of your instance For example it can be useful to check for your user:
- its status
- its role and permissions
- its organization
- its request limit
current_user_info = dtl.MyAccount.me()
print(current_user_info)For more information on the API used by this library, see the documentation