Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 102 additions & 32 deletions pyghee/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
# see https://github.com/boegel/pyghee
#
# author: Kenneth Hoste (@boegel)
# author: Sondre Bergsvaag Risanger (@sondrebr)
#
# license: GPLv2
#
import base64
import datetime
import flask
import hmac
Expand All @@ -20,32 +22,52 @@
from .utils import create_file, error, log, log_warning

EVENTS_LOG_DIR = os.path.join(os.getcwd(), 'events_log')
GITHUB = "github"
GITLAB = "gitlab"
SHA1 = 'sha1'
SHA256 = 'sha256'
STANDARD_WEBHOOKS_SIGN_PREFIX = 'v1,'
STANDARD_WEBHOOKS_SECRET_PREFIX = 'whsec_'
UNKNOWN = 'UNKNOWN'


def get_event_info(request):
def get_event_info(request, event_source=GITHUB):
"""
Extract event info from raw header data, and return result as Python dictionary value
"""
event_info = {
'action': request.json.get('action', UNKNOWN),
'id': request.headers['X-Github-Delivery'],
'signature-sha1': request.headers['X-Hub-Signature'],
'timestamp_raw': request.headers['Timestamp'],
'type': request.headers['X-GitHub-Event'],
}
if event_source == GITHUB:
event_info = {
'action': request.json.get('action', UNKNOWN),
'id': request.headers['X-Github-Delivery'],
'signature-sha1': request.headers['X-Hub-Signature'],
'timestamp_raw': request.headers['Timestamp'],
'type': request.headers['X-GitHub-Event'],
}
ts = int(event_info['timestamp_raw'])/1000.
elif event_source == GITLAB:
object_attributes = request.json.get('object_attributes', {})
event_info = {
'action': object_attributes.get('action', UNKNOWN),
'id': request.headers['Webhook-Id'],
'signature-sha1': request.headers['Webhook-Signature'],
'timestamp_raw': request.headers['Webhook-Timestamp'],
'type': request.json.get("event_type", UNKNOWN),
}
ts = int(event_info['timestamp_raw'])
else:
error(f"Unsupported event source: {event_source}")

event_info.update({
'raw_request_body': request.json,
'raw_request_data': request.data,
'raw_request_headers': dict(request.headers),
})

timestamp = datetime.datetime.utcfromtimestamp(int(event_info['timestamp_raw'])/1000.)
timestamp = datetime.datetime.fromtimestamp(ts, tz=datetime.UTC)
iso_timestamp = timestamp.isoformat(timespec="seconds")
event_info['timestamp'] = timestamp
event_info['date'] = timestamp.isoformat().split('T')[0]
event_info['time'] = timestamp.isoformat().split('T')[1].split('.')[0].replace(':', '-')
event_info['date'] = iso_timestamp.split('T')[0]
event_info['time'] = iso_timestamp.split('T')[1].split('+')[0].replace(':', '-')

return event_info

Expand All @@ -64,26 +86,41 @@ def read_event_from_json(jsonfile):

class PyGHee(flask.Flask):

def __init__(self, *args, **kwargs):
def __init__(self, event_source=GITHUB, *args, **kwargs):
"""
PyGHee constructor.
"""
super(PyGHee, self).__init__('PyGHee', *args, **kwargs)
self.event_source = event_source.lower()

github_token = os.getenv('GITHUB_TOKEN')
if github_token is None:
error("GitHub token is not available via $GITHUB_TOKEN!")
else:
del os.environ['GITHUB_TOKEN']
if self.event_source == GITHUB:
github_token = os.getenv('GITHUB_TOKEN')
if github_token is None:
error("GitHub token is not available via $GITHUB_TOKEN!")
else:
del os.environ['GITHUB_TOKEN']

self.gh = github.Github(github_token)
self.gh = github.Github(github_token)

# see https://docs.github.com/en/developers/webhooks-and-events/securing-your-webhooks
self.github_app_secret_token = os.getenv('GITHUB_APP_SECRET_TOKEN')
if self.github_app_secret_token is None:
error("Webhook secret is not available via $GITHUB_APP_SECRET_TOKEN!")
# see https://docs.github.com/en/developers/webhooks-and-events/securing-your-webhooks
self.github_app_secret_token = os.getenv('GITHUB_APP_SECRET_TOKEN')
if self.github_app_secret_token is None:
error("Webhook secret is not available via $GITHUB_APP_SECRET_TOKEN!")
else:
del os.environ['GITHUB_APP_SECRET_TOKEN']
elif self.event_source == GITLAB:
secret_token = os.getenv('GITLAB_WEBHOOK_SECRET_TOKEN')
if secret_token is None:
error("Webhook secret is not available via $GITLAB_WEBHOOK_SECRET_TOKEN!")
else:
# Strip 'whsec_' prefix if present
secret_token = secret_token.removeprefix(STANDARD_WEBHOOKS_SECRET_PREFIX)
self.gitlab_webhook_secret_token = secret_token
del os.environ['GITLAB_WEBHOOK_SECRET_TOKEN']
else:
del os.environ['GITHUB_APP_SECRET_TOKEN']
error_msg = f"'{self.event_source}' is not a supported webhook source."
error_msg += " Supported event_source values are 'github' and 'gitlab'."
error(error_msg)

self.registered_events = []

Expand Down Expand Up @@ -149,43 +186,76 @@ def log_event(self, event_info, events_log_dir=None, log_file=None):
def verify_request(self, event_info, abort_function, log_file=None):
"""
Verify request by checking webhook secret in request header.
Webhook secret must also be available in $GITHUB_APP_SECRET_TOKEN environment variable.
Webhook secret must also be available in $GITHUB_APP_SECRET_TOKEN
or $GITLAB_WEBHOOK_SECRET_TOKEN environment variable.
"""

header_signature = event_info['signature-sha1']
# if no signature is found, the request is forbidden
if header_signature is None:
log_warning("Missing signature in request header => 403", log_file=log_file)
abort_function(403)
# GitLab uses "v1,<b64sign>" format for signatures
# https://docs.gitlab.com/user/project/integrations/webhooks/#signing-tokens
elif self.event_source == GITLAB:
# GitLab sends a space-separated list of signatures
signatures = header_signature.split(" ")

if not any(signature.startswith("v1,") for signature in signatures):
log_warning("No known signature types in request header => 501", log_file=log_file)
abort_function(501)

user_agent = event_info['raw_request_headers'].get("User-Agent", "")
if not user_agent.lower().startswith(GITLAB):
log_warning("Webhook user agent not supported (%s) => 501" % user_agent, log_file=log_file)
abort_function(501)

secret_token = self.gitlab_webhook_secret_token
try:
key = base64.standard_b64decode(secret_token)
except Exception as err:
raise Exception(f"Configured key is not base64-encoded: {err}")

timestamp = event_info['timestamp_raw'].encode('utf8')
webhook_id = event_info['id'].encode('utf8')
request_data = event_info['raw_request_data']
components = (webhook_id, timestamp, request_data)
mac = hmac.new(key, msg=b'.'.join(components), digestmod=SHA256)
expected_signature = "v1," + base64.standard_b64encode(mac.digest()).decode()
verified = any(hmac.compare_digest(expected_signature, signature) for signature in signatures)
# GitHub uses "sha<#>=<hexsign>" signature format
else:
header_parts = header_signature.split('=')
if len(header_parts) == 2:
signature_type, signature = header_parts
if signature_type == SHA1:
# see https://docs.python.org/3/library/hmac.html
request_data = event_info['raw_request_data']
mac = hmac.new(self.github_app_secret_token.encode(), msg=request_data, digestmod=SHA1)
if hmac.compare_digest(str(mac.hexdigest()), str(signature)):
log("Request verified: signature OK!", log_file=log_file)
else:
log_warning("Faulty signature in request header => 403", log_file=log_file)
abort_function(403)
secret_token = self.github_app_secret_token
mac = hmac.new(secret_token.encode(), msg=request_data, digestmod=signature_type)
verified = hmac.compare_digest(str(mac.hexdigest()), str(signature))
else:
# we only know how to verify a SHA1 signature
# we only know how to verify SHA1 signatures
log_warning("Uknown type of signature (%s) => 501" % signature_type, log_file=log_file)
abort_function(501)
else:
log_warning("Type of signature not specified (%s) => 501" % header_signature, log_file=log_file)
abort_function(501)

if verified:
log("Request verified: signature OK!", log_file=log_file)
else:
log_warning("Faulty signature in request header => 403", log_file=log_file)
abort_function(403)

def process_event(self, request, abort_function,
events_log_dir=None, log_file=None, raise_error=False, verify=True):
"""
Process a single event (log + verify + handle).
Logs a warning in case of crash while processing event.
"""
try:
event_info = get_event_info(request)
event_info = get_event_info(request, self.event_source)
event_id = event_info['id']
if self.register_event(event_id):
self.log_event(event_info, events_log_dir=events_log_dir, log_file=log_file)
Expand Down