mirror of
https://gitlab.com/keys.openpgp.org/hagrid.git
synced 2025-10-05 16:12:44 +02:00
105 lines
4.0 KiB
Python
105 lines
4.0 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Simple flask server that checks whether a domain is allowed as a WKD target.
|
|
# Most importantly, this determines whether we attempt to request a certificate
|
|
# from letsencrypt for it.
|
|
#
|
|
# The fqdn must fulfill the following criteria:
|
|
# - it must start with "openpgpkey.", since only those are relevant for the WKD
|
|
# advanced method
|
|
# - it must be directly below a public suffix. this makes it hard for anyone to
|
|
# generate arbitrary numbers of subdomains.
|
|
# - it must be a CNAME that points to wkd.keys.openpgp.org. We do a simple DoH
|
|
# request to cloudflare to make sure it looks correct from someone else's
|
|
# perspective.
|
|
#
|
|
# Configuration via environment variables:
|
|
#
|
|
# - FLASK_GATEWAY_DOMAIN must be set to gateway domain (e.g. wkd.keys.openpgp.org)
|
|
# - FLASK_ALLOWLIST_FILE may point to a file that contains an explicit allowlist of domains, one per line
|
|
|
|
import requests
|
|
import publicsuffix2
|
|
import flask
|
|
import markupsafe
|
|
import sys
|
|
|
|
app = flask.Flask('wkd-domain-checker')
|
|
app.config.from_prefixed_env()
|
|
if 'GATEWAY_DOMAIN' not in app.config:
|
|
app.logger.error('missing config: FLASK_GATEWAY_DOMAIN')
|
|
sys.exit(1)
|
|
|
|
app.config.from_envvar('ALLOWLIST_FILE', silent=True)
|
|
if 'ALLOWLIST_FILE' in app.config:
|
|
with open(app.config['ALLOWLIST_FILE'], 'r') as f:
|
|
app.config['EXPLICIT_ALLOWED_DOMAINS'] = f.read().splitlines()
|
|
else:
|
|
app.config['EXPLICIT_ALLOWED_DOMAINS'] = []
|
|
|
|
@app.route('/status/')
|
|
@app.route('/')
|
|
def check():
|
|
domain = flask.request.args.get('domain')
|
|
if not domain:
|
|
return 'missing parameter: domain\n', 400
|
|
|
|
result = check_domain(domain)
|
|
app.logger.info(f'{domain}: {result}')
|
|
return result
|
|
|
|
def check_domain(domain):
|
|
# check allowlist of domains. we don't allow arbitrary subdomains for abuse
|
|
# reasons, but other entries are generally possible. just ask.
|
|
if domain in app.config['EXPLICIT_ALLOWED_DOMAINS']:
|
|
return 'ok: domain is allowlisted\n'
|
|
|
|
if not domain.startswith('openpgpkey.'):
|
|
return 'domain must have "openpgpkey" prefix\n', 400
|
|
|
|
if domain != ("openpgpkey." + publicsuffix2.get_sld(domain)):
|
|
return 'subdomains can only be used upon request. send an email to <tt>support at keys dot openpgp dot org</tt>\n', 400
|
|
|
|
req = requests.get(
|
|
'https://cloudflare-dns.com/dns-query',
|
|
params={
|
|
'name': domain,
|
|
'type': 'CNAME'
|
|
},
|
|
headers={
|
|
'accept': 'application/dns-json'
|
|
}
|
|
)
|
|
app.logger.debug(f'lookup url: {req.url}')
|
|
|
|
if req.status_code != 200:
|
|
app.logger.debug(f'dns error: {req.status_code} {req.text})')
|
|
flask.abort(400, f'CNAME lookup failed (http {req.status_code})')
|
|
response = req.json()
|
|
app.logger.debug(f'response json: {response}')
|
|
|
|
if 'Status' not in response:
|
|
return 'CNAME lookup failed (no status)\n', 400
|
|
if response['Status'] != 0:
|
|
return 'CNAME lookup failed (invalid domain?)\n', 400
|
|
if 'Answer' not in response:
|
|
return 'CNAME lookup failed: no CNAME record set\n', 400
|
|
if len(response['Answer']) != 1:
|
|
return 'CNAME lookup failed: ambiguous answer section\n', 400
|
|
answer = response['Answer'][0]
|
|
if answer['type'] != 5:
|
|
return 'CNAME lookup failed: unexpected response (record type)\n', 400
|
|
if answer['name'] != domain and answer['name'] != f'{domain}.':
|
|
return f'CNAME lookup failed: unexpected response (domain response was for {markupsafe.escape(domain)})\n', 400
|
|
if not answer['data'].startswith(app.config['GATEWAY_DOMAIN']):
|
|
return f'CNAME lookup failed: {markupsafe.escape(domain)} resolves to {markupsafe.escape(answer["data"])} (expected {app.config['GATEWAY_DOMAIN']})\n', 400
|
|
return f'CNAME lookup ok: {markupsafe.escape(domain)} resolves to {app.config['GATEWAY_DOMAIN']}\n'
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|
|
else:
|
|
import logging
|
|
gunicorn_logger = logging.getLogger('gunicorn.error')
|
|
app.logger.handlers = gunicorn_logger.handlers
|
|
app.logger.setLevel(gunicorn_logger.level)
|