| #!/usr/bin/env vpython3 |
| |
| # -*- coding:utf-8 -*- |
| # Copyright (c) 2023 The WebRTC project authors. All Rights Reserved. |
| # |
| # Use of this source code is governed by a BSD-style license |
| # that can be found in the LICENSE file in the root of the source |
| # tree. An additional intellectual property rights grant can be found |
| # in the file PATENTS. All contributing project authors may |
| # be found in the AUTHORS file in the root of the source tree. |
| import argparse |
| import logging |
| from pathlib import Path |
| import tempfile |
| from typing import Tuple, Any, List, ByteString |
| from datetime import datetime, timezone |
| from hashlib import sha256 |
| from urllib.request import urlopen |
| from asn1crypto import pem, x509 |
| |
| _GENERATED_FILE = 'ssl_roots.h' |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description='This is a tool to transform a crt file ' |
| f'into a C/C++ header: {_GENERATED_FILE}.') |
| |
| parser.add_argument('source_path_or_url', |
| help='File path or URL to PEM storage file. ' |
| 'The supported cert files are: ' |
| '- Google: https://pki.goog/roots.pem; ' |
| '- Mozilla: https://curl.se/ca/cacert.pem') |
| parser.add_argument('-v', |
| '--verbose', |
| dest='verbose', |
| action='store_true', |
| help='Print output while running') |
| parser.add_argument('-f', |
| '--full_cert', |
| dest='full_cert', |
| action='store_true', |
| help='Add public key and certificate name. ' |
| 'Default is to skip and reduce generated file size.') |
| args = parser.parse_args() |
| logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING) |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| cert_file = Path(temp_dir) / "cacert.pem" |
| |
| if args.source_path_or_url.startswith( |
| 'https://') or args.source_path_or_url.startswith('http://'): |
| _DownloadCertificatesStore(args.source_path_or_url, cert_file) |
| destination_dir = Path.cwd() |
| else: |
| source_path = Path(args.source_path_or_url) |
| cert_file.write_bytes(source_path.read_bytes()) |
| destination_dir = source_path.parent |
| |
| logging.debug('Stored certificate from %s into %s', args.source_path_or_url, |
| cert_file) |
| |
| header_file = destination_dir / _GENERATED_FILE |
| |
| digest, certificates = _LoadCertificatesStore(cert_file) |
| _GenerateCHeader(header_file, args.source_path_or_url, digest, certificates, |
| args.full_cert) |
| |
| logging.debug('Did generate %s from %s [%s]', header_file, |
| args.source_path_or_url, digest) |
| |
| |
| def _DownloadCertificatesStore(pem_url: str, destination_file: Path): |
| with urlopen(pem_url) as response: |
| pem_file = response.read() |
| logging.info('Got response with status [%d]: %s', response.status, pem_url) |
| |
| if destination_file.parent.exists(): |
| logging.debug('Creating directory and it\'s parents %s', |
| destination_file.parent) |
| destination_file.parent.mkdir(parents=True, exist_ok=True) |
| if destination_file.exists(): |
| logging.debug('Unlink existing file %s', destination_file) |
| destination_file.unlink(missing_ok=True) |
| |
| destination_file.write_bytes(pem_file) |
| logging.info('Stored downloaded %d bytes pem file to `%s`', len(pem_file), |
| destination_file) |
| |
| |
| def _LoadCertificatesStore( |
| source_file: Path) -> Tuple[str, List[x509.Certificate]]: |
| pem_bytes = source_file.read_bytes() |
| |
| certificates = [ |
| x509.Certificate.load(der) |
| for type, _, der in pem.unarmor(pem_bytes, True) if type == 'CERTIFICATE' |
| ] |
| digest = f'sha256:{sha256(pem_bytes).hexdigest()}' |
| logging.debug('Loaded %d certificates from %s [%s] ', len(certificates), |
| source_file, digest) |
| return digest, certificates |
| |
| |
| def _GenerateCHeader(header_file: Path, source: str, source_digest: str, |
| certificates: List[x509.Certificate], full_cert: bool): |
| header_file.parent.mkdir(parents=True, exist_ok=True) |
| with header_file.open('w') as output: |
| output.write(_CreateOutputHeader(source, source_digest)) |
| |
| named_certificates = [(cert, |
| f'kCertificateWithFingerprint_{cert.sha256.hex()}') |
| for cert in certificates] |
| |
| names = list(map(lambda x: x[1], named_certificates)) |
| unique_names = list(set(names)) |
| if len(names) != len(unique_names): |
| raise RuntimeError( |
| f'There are {len(names) - len(unique_names)} non-unique ' |
| 'certificate names generated. Generator script must be ' |
| 'fixed to handle collision.') |
| |
| for cert, name in named_certificates: |
| |
| output.write(_CreateCertificateMetadataHeader(cert)) |
| |
| if full_cert: |
| output.write( |
| _CArrayConstantDefinition('unsigned char', |
| f'{name}_subject_name', |
| _CreateHexList(cert.subject.dump()), |
| max_items_per_line=16)) |
| output.write('\n') |
| output.write( |
| _CArrayConstantDefinition('unsigned char', |
| f'{name}_public_key', |
| _CreateHexList(cert.public_key.dump()), |
| max_items_per_line=16)) |
| output.write('\n') |
| |
| output.write( |
| _CArrayConstantDefinition('unsigned char', |
| f'{name}_certificate', |
| _CreateHexList(cert.dump()), |
| max_items_per_line=16)) |
| output.write('\n\n') |
| |
| if full_cert: |
| output.write( |
| _CArrayConstantDefinition('unsigned char* const', |
| 'kSSLCertSubjectNameList', |
| [f'{name}_subject_name' for name in names])) |
| output.write('\n\n') |
| |
| output.write( |
| _CArrayConstantDefinition('unsigned char* const', |
| 'kSSLCertPublicKeyList', |
| [f'{name}_public_key' for name in names])) |
| output.write('\n\n') |
| |
| output.write( |
| _CArrayConstantDefinition('unsigned char* const', |
| 'kSSLCertCertificateList', |
| [f'{name}_certificate' for name in names])) |
| output.write('\n\n') |
| |
| output.write( |
| _CArrayConstantDefinition( |
| 'size_t', 'kSSLCertCertificateSizeList', |
| [f'{len(cert.dump())}' for cert, _ in named_certificates])) |
| output.write('\n\n') |
| |
| output.write(_CreateOutputFooter()) |
| |
| |
| def _CreateHexList(items: ByteString) -> List[str]: |
| """ |
| Produces list of strings each item is hex literal of byte of source sequence |
| """ |
| return [f'0x{item:02X}' for item in items] |
| |
| |
| def _CArrayConstantDefinition(type_name: str, |
| array_name: str, |
| items: List[Any], |
| max_items_per_line: int = 1) -> str: |
| """ |
| Produces C array definition like: `const type_name array_name = { items };` |
| """ |
| return (f'const {type_name} {array_name}[{len(items)}]=' |
| f'{_CArrayInitializerList(items, max_items_per_line)};') |
| |
| |
| def _CArrayInitializerList(items: List[Any], |
| max_items_per_line: int = 1) -> str: |
| """ |
| Produces C initializer list like: `{\\nitems[0], \\n ...}` |
| """ |
| return '{\n' + '\n'.join([ |
| ','.join(items[i:i + max_items_per_line]) + ',' |
| for i in range(0, len(items), max_items_per_line) |
| ]) + '\n}' |
| |
| |
| def _CreateCertificateMetadataHeader(cert: x509.Certificate) -> str: |
| return (f'/* subject: {cert.subject.human_friendly} */\n' |
| f'/* issuer: {cert.issuer.human_friendly} */\n' |
| f'/* link: https://crt.sh/?q={cert.sha256.hex()} */\n') |
| |
| |
| def _CreateOutputHeader(source_path_or_url: str, source_digest: str) -> str: |
| now_utc = datetime.now(timezone.utc).replace(microsecond=0) |
| output = ( |
| '/*\n' |
| f' * Copyright {now_utc.year} The WebRTC Project Authors. All rights ' |
| 'reserved.\n' |
| ' *\n' |
| ' * Use of this source code is governed by a BSD-style license\n' |
| ' * that can be found in the LICENSE file in the root of the ' |
| 'source\n' |
| ' * tree. An additional intellectual property rights grant can be ' |
| 'found\n' |
| ' * in the file PATENTS. All contributing project authors may\n' |
| ' * be found in the AUTHORS file in the root of the source tree.\n' |
| ' */\n\n' |
| '#ifndef RTC_BASE_SSL_ROOTS_H_\n' |
| '#define RTC_BASE_SSL_ROOTS_H_\n\n' |
| '// This file is the root certificates in C form.\n\n' |
| f'// It was generated at {now_utc.isoformat()} by the following script:\n' |
| '// `tools_webrtc/sslroots/generate_sslroots.py ' |
| f'{source_path_or_url}`\n\n' |
| '// clang-format off\n' |
| '// Don\'t bother formatting generated code,\n' |
| '// also it would breaks subject/issuer lines.\n\n' |
| f'// Source bundle `{source_path_or_url}` digest is [{source_digest}]\n\n' |
| ) |
| return output |
| |
| |
| def _CreateOutputFooter(): |
| return '// clang-format on\n\n#endif // RTC_BASE_SSL_ROOTS_H_\n' |
| |
| |
| if __name__ == '__main__': |
| main() |