From 9ce6c0d8e92fcc2296fc5dce585415090c4424f0 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Thu, 2 Jul 2020 14:12:47 +0100 Subject: [PATCH] crypto: Add type hints Nothing to complicated here, other than working around mypy's dislike of changing variable types and inability to process 'if six.PY3' conditional blocks because it doesn't run the code. Part of blueprint add-emulated-virtual-tpm Change-Id: I805eaa8b6fb55ce9cbc8f6b8b777af48302ba2ba Signed-off-by: Stephen Finucane --- mypy-files.txt | 1 + nova/crypto.py | 62 +++++++++++++++++++++++++------------------------- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/mypy-files.txt b/mypy-files.txt index 181e207bdcfc..a2085f8f4a26 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -1,3 +1,4 @@ +nova/crypto.py nova/virt/driver.py nova/virt/hardware.py nova/virt/libvirt/__init__.py diff --git a/nova/crypto.py b/nova/crypto.py index f195bce4fa52..b927f0c255f5 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -21,7 +21,9 @@ Includes root and intermediate CAs, SSH key_pairs and x509 certificates. import base64 import binascii +import io import os +import typing as ty from cryptography.hazmat import backends from cryptography.hazmat.primitives.asymmetric import padding @@ -31,7 +33,6 @@ from cryptography import x509 from oslo_concurrency import processutils from oslo_log import log as logging import paramiko -import six import nova.conf from nova import exception @@ -44,7 +45,7 @@ LOG = logging.getLogger(__name__) CONF = nova.conf.CONF -def generate_fingerprint(public_key): +def generate_fingerprint(public_key: str) -> str: try: pub_bytes = public_key.encode('utf-8') # Test that the given public_key string is a proper ssh key. The @@ -56,24 +57,22 @@ def generate_fingerprint(public_key): digest = hashes.Hash(hashes.MD5(), backends.default_backend()) digest.update(pub_data) md5hash = digest.finalize() - raw_fp = binascii.hexlify(md5hash) - if six.PY3: - raw_fp = raw_fp.decode('ascii') + raw_fp = binascii.hexlify(md5hash).decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) except Exception: raise exception.InvalidKeypair( reason=_('failed to generate fingerprint')) -def generate_x509_fingerprint(pem_key): +def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str: try: - if isinstance(pem_key, six.text_type): + if isinstance(pem_key, str): pem_key = pem_key.encode('utf-8') cert = x509.load_pem_x509_certificate( pem_key, backends.default_backend()) - raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1())) - if six.PY3: - raw_fp = raw_fp.decode('ascii') + raw_fp = binascii.hexlify( + cert.fingerprint(hashes.SHA1()) + ).decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) except (ValueError, TypeError, binascii.Error) as ex: raise exception.InvalidKeypair( @@ -81,9 +80,9 @@ def generate_x509_fingerprint(pem_key): 'Error message: %s') % ex) -def generate_key_pair(bits=2048): +def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]: key = paramiko.RSAKey.generate(bits) - keyout = six.StringIO() + keyout = io.StringIO() key.write_private_key(keyout) private_key = keyout.getvalue() public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64()) @@ -91,12 +90,12 @@ def generate_key_pair(bits=2048): return (private_key, public_key, fingerprint) -def ssh_encrypt_text(ssh_public_key, text): +def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes: """Encrypt text with an ssh public key. If text is a Unicode string, encode it to UTF-8. """ - if isinstance(text, six.text_type): + if isinstance(text, str): text = text.encode('utf-8') try: pub_bytes = ssh_public_key.encode('utf-8') @@ -104,10 +103,13 @@ def ssh_encrypt_text(ssh_public_key, text): pub_bytes, backends.default_backend()) return pub_key.encrypt(text, padding.PKCS1v15()) except Exception as exc: - raise exception.EncryptionFailure(reason=six.text_type(exc)) + raise exception.EncryptionFailure(reason=str(exc)) -def generate_winrm_x509_cert(user_id, bits=2048): +def generate_winrm_x509_cert( + user_id: str, + bits: int = 2048 +) -> ty.Tuple[str, str, str]: """Generate a cert for passwordless auth for user in project.""" subject = '/CN=%s' % user_id upn = '%s@localhost' % user_id @@ -118,28 +120,26 @@ def generate_winrm_x509_cert(user_id, bits=2048): _create_x509_openssl_config(conffile, upn) - (certificate, _err) = processutils.execute( - 'openssl', 'req', '-x509', '-nodes', '-days', '3650', - '-config', conffile, '-newkey', 'rsa:%s' % bits, - '-outform', 'PEM', '-keyout', keyfile, '-subj', subject, - '-extensions', 'v3_req_client', - binary=True) + out, _ = processutils.execute( + 'openssl', 'req', '-x509', '-nodes', '-days', '3650', + '-config', conffile, '-newkey', 'rsa:%s' % bits, + '-outform', 'PEM', '-keyout', keyfile, '-subj', subject, + '-extensions', 'v3_req_client', + binary=True) - (out, _err) = processutils.execute('openssl', 'pkcs12', '-export', - '-inkey', keyfile, '-password', 'pass:', - process_input=certificate, - binary=True) + certificate = out.decode('utf-8') - private_key = base64.b64encode(out) + out, _ = processutils.execute( + 'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password', + 'pass:', process_input=out, binary=True) + + private_key = base64.b64encode(out).decode('ascii') fingerprint = generate_x509_fingerprint(certificate) - if six.PY3: - private_key = private_key.decode('ascii') - certificate = certificate.decode('utf-8') return (private_key, certificate, fingerprint) -def _create_x509_openssl_config(conffile, upn): +def _create_x509_openssl_config(conffile: str, upn: str): content = ("distinguished_name = req_distinguished_name\n" "[req_distinguished_name]\n" "[v3_req_client]\n"