diff --git a/pyghee/lib.py b/pyghee/lib.py index a7ccdd9..b9c83d4 100644 --- a/pyghee/lib.py +++ b/pyghee/lib.py @@ -3,9 +3,11 @@ # see https://github.com/boegel/pyghee # # author: Kenneth Hoste (@boegel) +# author: Sondre Bergsvaag Risanger (@sondrebr) # # license: GPLv2 # +import base64 import datetime import flask import hmac @@ -20,21 +22,40 @@ from .utils import create_file, error, log, log_warning EVENTS_LOG_DIR = os.path.join(os.getcwd(), 'events_log') +GITHUB = "github" +GITLAB = "gitlab" SHA1 = 'sha1' +SHA256 = 'sha256' +STANDARD_WEBHOOKS_SIGN_PREFIX = 'v1,' +STANDARD_WEBHOOKS_SECRET_PREFIX = 'whsec_' UNKNOWN = 'UNKNOWN' -def get_event_info(request): +def get_event_info(request, event_source=GITHUB): """ Extract event info from raw header data, and return result as Python dictionary value """ - event_info = { - 'action': request.json.get('action', UNKNOWN), - 'id': request.headers['X-Github-Delivery'], - 'signature-sha1': request.headers['X-Hub-Signature'], - 'timestamp_raw': request.headers['Timestamp'], - 'type': request.headers['X-GitHub-Event'], - } + if event_source == GITHUB: + event_info = { + 'action': request.json.get('action', UNKNOWN), + 'id': request.headers['X-Github-Delivery'], + 'signature-sha1': request.headers['X-Hub-Signature'], + 'timestamp_raw': request.headers['Timestamp'], + 'type': request.headers['X-GitHub-Event'], + } + ts = int(event_info['timestamp_raw'])/1000. + elif event_source == GITLAB: + object_attributes = request.json.get('object_attributes', {}) + event_info = { + 'action': object_attributes.get('action', UNKNOWN), + 'id': request.headers['Webhook-Id'], + 'signature-sha1': request.headers['Webhook-Signature'], + 'timestamp_raw': request.headers['Webhook-Timestamp'], + 'type': request.json.get("event_type", UNKNOWN), + } + ts = int(event_info['timestamp_raw']) + else: + error(f"Unsupported event source: {event_source}") event_info.update({ 'raw_request_body': request.json, @@ -42,10 +63,11 @@ def get_event_info(request): 'raw_request_headers': dict(request.headers), }) - timestamp = datetime.datetime.utcfromtimestamp(int(event_info['timestamp_raw'])/1000.) + timestamp = datetime.datetime.fromtimestamp(ts, tz=datetime.UTC) + iso_timestamp = timestamp.isoformat(timespec="seconds") event_info['timestamp'] = timestamp - event_info['date'] = timestamp.isoformat().split('T')[0] - event_info['time'] = timestamp.isoformat().split('T')[1].split('.')[0].replace(':', '-') + event_info['date'] = iso_timestamp.split('T')[0] + event_info['time'] = iso_timestamp.split('T')[1].split('+')[0].replace(':', '-') return event_info @@ -64,26 +86,41 @@ def read_event_from_json(jsonfile): class PyGHee(flask.Flask): - def __init__(self, *args, **kwargs): + def __init__(self, event_source=GITHUB, *args, **kwargs): """ PyGHee constructor. """ super(PyGHee, self).__init__('PyGHee', *args, **kwargs) + self.event_source = event_source.lower() - github_token = os.getenv('GITHUB_TOKEN') - if github_token is None: - error("GitHub token is not available via $GITHUB_TOKEN!") - else: - del os.environ['GITHUB_TOKEN'] + if self.event_source == GITHUB: + github_token = os.getenv('GITHUB_TOKEN') + if github_token is None: + error("GitHub token is not available via $GITHUB_TOKEN!") + else: + del os.environ['GITHUB_TOKEN'] - self.gh = github.Github(github_token) + self.gh = github.Github(github_token) - # see https://docs.github.com/en/developers/webhooks-and-events/securing-your-webhooks - self.github_app_secret_token = os.getenv('GITHUB_APP_SECRET_TOKEN') - if self.github_app_secret_token is None: - error("Webhook secret is not available via $GITHUB_APP_SECRET_TOKEN!") + # see https://docs.github.com/en/developers/webhooks-and-events/securing-your-webhooks + self.github_app_secret_token = os.getenv('GITHUB_APP_SECRET_TOKEN') + if self.github_app_secret_token is None: + error("Webhook secret is not available via $GITHUB_APP_SECRET_TOKEN!") + else: + del os.environ['GITHUB_APP_SECRET_TOKEN'] + elif self.event_source == GITLAB: + secret_token = os.getenv('GITLAB_WEBHOOK_SECRET_TOKEN') + if secret_token is None: + error("Webhook secret is not available via $GITLAB_WEBHOOK_SECRET_TOKEN!") + else: + # Strip 'whsec_' prefix if present + secret_token = secret_token.removeprefix(STANDARD_WEBHOOKS_SECRET_PREFIX) + self.gitlab_webhook_secret_token = secret_token + del os.environ['GITLAB_WEBHOOK_SECRET_TOKEN'] else: - del os.environ['GITHUB_APP_SECRET_TOKEN'] + error_msg = f"'{self.event_source}' is not a supported webhook source." + error_msg += " Supported event_source values are 'github' and 'gitlab'." + error(error_msg) self.registered_events = [] @@ -149,7 +186,8 @@ def log_event(self, event_info, events_log_dir=None, log_file=None): def verify_request(self, event_info, abort_function, log_file=None): """ Verify request by checking webhook secret in request header. - Webhook secret must also be available in $GITHUB_APP_SECRET_TOKEN environment variable. + Webhook secret must also be available in $GITHUB_APP_SECRET_TOKEN + or $GITLAB_WEBHOOK_SECRET_TOKEN environment variable. """ header_signature = event_info['signature-sha1'] @@ -157,6 +195,35 @@ def verify_request(self, event_info, abort_function, log_file=None): if header_signature is None: log_warning("Missing signature in request header => 403", log_file=log_file) abort_function(403) + # GitLab uses "v1," format for signatures + # https://docs.gitlab.com/user/project/integrations/webhooks/#signing-tokens + elif self.event_source == GITLAB: + # GitLab sends a space-separated list of signatures + signatures = header_signature.split(" ") + + if not any(signature.startswith("v1,") for signature in signatures): + log_warning("No known signature types in request header => 501", log_file=log_file) + abort_function(501) + + user_agent = event_info['raw_request_headers'].get("User-Agent", "") + if not user_agent.lower().startswith(GITLAB): + log_warning("Webhook user agent not supported (%s) => 501" % user_agent, log_file=log_file) + abort_function(501) + + secret_token = self.gitlab_webhook_secret_token + try: + key = base64.standard_b64decode(secret_token) + except Exception as err: + raise Exception(f"Configured key is not base64-encoded: {err}") + + timestamp = event_info['timestamp_raw'].encode('utf8') + webhook_id = event_info['id'].encode('utf8') + request_data = event_info['raw_request_data'] + components = (webhook_id, timestamp, request_data) + mac = hmac.new(key, msg=b'.'.join(components), digestmod=SHA256) + expected_signature = "v1," + base64.standard_b64encode(mac.digest()).decode() + verified = any(hmac.compare_digest(expected_signature, signature) for signature in signatures) + # GitHub uses "sha<#>=" signature format else: header_parts = header_signature.split('=') if len(header_parts) == 2: @@ -164,20 +231,23 @@ def verify_request(self, event_info, abort_function, log_file=None): if signature_type == SHA1: # see https://docs.python.org/3/library/hmac.html request_data = event_info['raw_request_data'] - mac = hmac.new(self.github_app_secret_token.encode(), msg=request_data, digestmod=SHA1) - if hmac.compare_digest(str(mac.hexdigest()), str(signature)): - log("Request verified: signature OK!", log_file=log_file) - else: - log_warning("Faulty signature in request header => 403", log_file=log_file) - abort_function(403) + secret_token = self.github_app_secret_token + mac = hmac.new(secret_token.encode(), msg=request_data, digestmod=signature_type) + verified = hmac.compare_digest(str(mac.hexdigest()), str(signature)) else: - # we only know how to verify a SHA1 signature + # we only know how to verify SHA1 signatures log_warning("Uknown type of signature (%s) => 501" % signature_type, log_file=log_file) abort_function(501) else: log_warning("Type of signature not specified (%s) => 501" % header_signature, log_file=log_file) abort_function(501) + if verified: + log("Request verified: signature OK!", log_file=log_file) + else: + log_warning("Faulty signature in request header => 403", log_file=log_file) + abort_function(403) + def process_event(self, request, abort_function, events_log_dir=None, log_file=None, raise_error=False, verify=True): """ @@ -185,7 +255,7 @@ def process_event(self, request, abort_function, Logs a warning in case of crash while processing event. """ try: - event_info = get_event_info(request) + event_info = get_event_info(request, self.event_source) event_id = event_info['id'] if self.register_event(event_id): self.log_event(event_info, events_log_dir=events_log_dir, log_file=log_file)