diff --git a/openstack/config/__init__.py b/openstack/config/__init__.py index 5b9d3d417..f11ead173 100644 --- a/openstack/config/__init__.py +++ b/openstack/config/__init__.py @@ -22,6 +22,13 @@ if ty.TYPE_CHECKING: from openstack.config import cloud_region +__all__ = [ + 'OpenStackConfig', + 'cloud_region', + 'get_cloud_region', +] + + # TODO(stephenfin): Expand kwargs once we've typed OpenstackConfig.get_one def get_cloud_region( service_key: str | None = None, @@ -53,7 +60,8 @@ def get_cloud_region( app_version=app_version, ) if options: - config.register_argparse_arguments(options, sys.argv, service_key) + service_keys = [service_key] if service_key is not None else [] + config.register_argparse_arguments(options, sys.argv, service_keys) parsed_options, _ = options.parse_known_args(sys.argv) else: parsed_options = None diff --git a/openstack/config/loader.py b/openstack/config/loader.py index 836fab380..3f8871d4b 100644 --- a/openstack/config/loader.py +++ b/openstack/config/loader.py @@ -40,6 +40,7 @@ from openstack import warnings as os_warnings if ty.TYPE_CHECKING: from keystoneauth1.loading._plugins.identity import v3 as v3_loaders + from keystoneauth1.loading import opts PLATFORMDIRS = platformdirs.PlatformDirs( 'openstack', 'OpenStack', multipath=True @@ -95,7 +96,7 @@ CSV_KEYS = ('auth_methods',) FORMAT_EXCLUSIONS = frozenset(['password']) -def get_boolean(value): +def get_boolean(value: ty.Any) -> bool: if value is None: return False if type(value) is bool: @@ -105,7 +106,9 @@ def get_boolean(value): return False -def _auth_update(old_dict, new_dict_source): +def _auth_update( + old_dict: dict[str, ty.Any], new_dict_source: dict[str, ty.Any] +) -> dict[str, ty.Any]: """Like dict.update, except handling the nested dict called auth.""" new_dict = copy.deepcopy(new_dict_source) for k, v in new_dict.items(): @@ -119,7 +122,7 @@ def _auth_update(old_dict, new_dict_source): return old_dict -def _fix_argv(argv): +def _fix_argv(argv: list[str]) -> None: # Transform any _ characters in arg names to - so that we don't # have to throw billions of compat argparse arguments around all # over the place. @@ -156,6 +159,15 @@ class OpenStackConfig: _cloud_region_class = cloud_region.CloudRegion _defaults_module = defaults + #: config_filename is the filename that configuration was loaded from, if + #: any. + config_filename: str | None + #: secure_config_filename is the filename that secure configuration was + #: loaded from, if any. + secure_config_filename: str | None + #: cloud_config contains the combined loaded configuration. + cloud_config: dict[str, ty.Any] + def __init__( self, config_files: list[str] | None = None, @@ -213,21 +225,25 @@ class OpenStackConfig: self.defaults.update(override_defaults) # First, use a config file if it exists where expected - self.config_filename, self.cloud_config = self._load_config_file() - if self.config_filename: - self._validate_config_file(self.config_filename, self.cloud_config) + config_filename, cloud_config = self._load_config_file() + if config_filename and cloud_config: + self._validate_config_file(config_filename, cloud_config) secure_config_filename, secure_config = self._load_secure_file() - if secure_config: + if secure_config_filename and secure_config: self._validate_config_file(secure_config_filename, secure_config) - self.cloud_config = _util.merge_clouds( - self.cloud_config, secure_config + cloud_config = _util.merge_clouds( + cloud_config or {}, secure_config ) - if not self.cloud_config: + self.config_filename = config_filename + self.secure_config_filename = secure_config_filename + if not cloud_config: self.cloud_config = {'clouds': {}} - elif 'clouds' not in self.cloud_config: - self.cloud_config['clouds'] = {} + else: + self.cloud_config = cloud_config + if 'clouds' not in self.cloud_config: + self.cloud_config['clouds'] = {} # Save the other config self.extra_config = copy.deepcopy(self.cloud_config) @@ -400,7 +416,9 @@ class OpenStackConfig: # password = self._pw_callback(prompt="Password: ") self._pw_callback = pw_func - def _get_os_environ(self, envvar_prefix=None): + def _get_os_environ( + self, envvar_prefix: str | None = None + ) -> dict[str, ty.Any] | None: ret = self._defaults_module.get_defaults() if not envvar_prefix: # This makes the or below be OS_ or OS_ which is a no-op @@ -429,12 +447,14 @@ class OpenStackConfig: return ret return None - def _get_envvar(self, key, default=None): + def _get_envvar(self, key: str, default: str | None = None) -> str | None: if not self._load_envvars: return default return os.environ.get(key, default) - def get_extra_config(self, key, defaults=None): + def get_extra_config( + self, key: str, defaults: dict[str, ty.Any] | None = None + ) -> dict[str, ty.Any]: """Fetch an arbitrary extra chunk of config, laying in defaults. :param string key: name of the config section to fetch @@ -442,22 +462,31 @@ class OpenStackConfig: found config """ defaults = _util.normalize_keys(defaults or {}) + assert defaults is not None # narrow type if not key: return defaults return _util.merge_clouds( defaults, _util.normalize_keys(self.cloud_config.get(key, {})) ) - def _load_config_file(self): + def _load_config_file( + self, + ) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]: return self._load_yaml_json_file(self._config_files) - def _load_secure_file(self): + def _load_secure_file( + self, + ) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]: return self._load_yaml_json_file(self._secure_files) - def _load_vendor_file(self): + def _load_vendor_file( + self, + ) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]: return self._load_yaml_json_file(self._vendor_files) - def _load_yaml_json_file(self, filelist): + def _load_yaml_json_file( + self, filelist: list[str] + ) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]: for path in filelist: if os.path.exists(path): try: @@ -471,7 +500,7 @@ class OpenStackConfig: # Can't access file so let's continue to the next # file continue - return (None, {}) + return (None, None) def _validate_config_file(self, path: str, data: ty.Any) -> bool: """Validate config file contains a clouds entry. @@ -492,10 +521,12 @@ class OpenStackConfig: return True - def _expand_region_name(self, region_name): + def _expand_region_name(self, region_name: str) -> dict[str, ty.Any]: return {'name': region_name, 'values': {}} - def _expand_regions(self, regions): + def _expand_regions( + self, regions: list[str | dict[str, ty.Any]] + ) -> list[dict[str, ty.Any]]: ret = [] for region in regions: if isinstance(region, dict): @@ -513,7 +544,7 @@ class OpenStackConfig: ret.append(self._expand_region_name(region)) return ret - def _get_regions(self, cloud): + def _get_regions(self, cloud: str) -> list[dict[str, ty.Any]]: if cloud not in self.cloud_config['clouds']: return [self._expand_region_name('')] regions = self._get_known_regions(cloud) @@ -522,7 +553,7 @@ class OpenStackConfig: regions = [self._expand_region_name('')] return regions - def _get_known_regions(self, cloud): + def _get_known_regions(self, cloud: str) -> list[dict[str, ty.Any]]: config = _util.normalize_keys(self.cloud_config['clouds'][cloud]) if 'regions' in config: return self._expand_regions(config['regions']) @@ -549,9 +580,14 @@ class OpenStackConfig: elif 'region_name' in new_cloud and new_cloud['region_name']: return [self._expand_region_name(new_cloud['region_name'])] - def _get_region(self, cloud=None, region_name=''): + return [] + + def _get_region( + self, cloud: str | None = None, region_name: str = '' + ) -> dict[str, ty.Any]: if region_name is None: region_name = '' + if not cloud: return self._expand_region_name(region_name) @@ -576,11 +612,13 @@ class OpenStackConfig: ) ) - def get_cloud_names(self): - return self.cloud_config['clouds'].keys() + def get_cloud_names(self) -> list[str]: + return list(self.cloud_config['clouds'].keys()) - def _get_base_cloud_config(self, name, profile=None): - cloud = dict() + def _get_base_cloud_config( + self, name: str | None, profile: str | None = None + ) -> dict[str, ty.Any]: + cloud = {} # Only validate cloud name if one was given if name and name not in self.cloud_config['clouds']: @@ -595,7 +633,7 @@ class OpenStackConfig: self._expand_vendor_profile(name, cloud, our_cloud) if 'auth' not in cloud: - cloud['auth'] = dict() + cloud['auth'] = {} _auth_update(cloud, our_cloud) if 'cloud' in cloud: @@ -603,7 +641,12 @@ class OpenStackConfig: return cloud - def _expand_vendor_profile(self, name, cloud, our_cloud): + def _expand_vendor_profile( + self, + name: str | None, + cloud: dict[str, ty.Any], + our_cloud: dict[str, ty.Any], + ) -> None: # Expand a profile if it exists. 'cloud' is an old confusing name # for this. profile_name = our_cloud.get('profile', our_cloud.get('cloud', None)) @@ -653,7 +696,7 @@ class OpenStackConfig: os_warnings.ConfigurationWarning, ) - def _project_scoped(self, cloud): + def _project_scoped(self, cloud: dict[str, ty.Any]) -> bool: return ( 'project_id' in cloud or 'project_name' in cloud @@ -661,7 +704,9 @@ class OpenStackConfig: or 'project_name' in cloud['auth'] ) - def _validate_networks(self, networks, key): + def _validate_networks( + self, networks: list[dict[str, ty.Any]], key: str + ) -> None: value = None for net in networks: if value and net[key]: @@ -674,7 +719,9 @@ class OpenStackConfig: if not value and net[key]: value = net - def _fix_backwards_networks(self, cloud): + def _fix_backwards_networks( + self, cloud: dict[str, ty.Any] + ) -> dict[str, ty.Any]: # Leave the external_network and internal_network keys in the # dict because consuming code might be expecting them. networks = [] @@ -733,7 +780,7 @@ class OpenStackConfig: cloud['networks'] = networks return cloud - def _handle_domain_id(self, cloud): + def _handle_domain_id(self, cloud: dict[str, ty.Any]) -> dict[str, ty.Any]: # Allow people to just specify domain once if it's the same mappings = { 'domain_id': ('user_domain_id', 'project_domain_id'), @@ -751,7 +798,9 @@ class OpenStackConfig: cloud['auth'].pop(target_key, None) return cloud - def _fix_backwards_auth(self, cloud): + def _fix_backwards_auth( + self, cloud: dict[str, ty.Any] + ) -> dict[str, ty.Any]: mappings = { 'domain_id': ('domain_id', 'domain-id'), 'domain_name': ('domain_name', 'domain-name'), @@ -810,7 +859,9 @@ class OpenStackConfig: cloud['auth'][target_key] = target return cloud - def _fix_backwards_auth_plugin(self, cloud): + def _fix_backwards_auth_plugin( + self, cloud: dict[str, ty.Any] + ) -> dict[str, ty.Any]: # Do the lists backwards so that auth_type is the ultimate winner mappings = { 'auth_type': ('auth_plugin', 'auth_type'), @@ -828,7 +879,12 @@ class OpenStackConfig: # completely broken return cloud - def register_argparse_arguments(self, parser, argv, service_keys=None): + def register_argparse_arguments( + self, + parser: argparse_mod.ArgumentParser, + argv: list[str], + service_keys: list[str] | None = None, + ) -> None: """Register all of the common argparse options needed. Given an argparse parser, register the keystoneauth Session arguments, @@ -937,7 +993,9 @@ class OpenStackConfig: parser.add_argument('--os-endpoint-type', help=argparse_mod.SUPPRESS) parser.add_argument('--endpoint-type', help=argparse_mod.SUPPRESS) - def _fix_backwards_interface(self, cloud): + def _fix_backwards_interface( + self, cloud: dict[str, ty.Any] + ) -> dict[str, ty.Any]: new_cloud = {} for key in cloud.keys(): if key.endswith('endpoint_type'): @@ -947,7 +1005,9 @@ class OpenStackConfig: new_cloud[target_key] = cloud[key] return new_cloud - def _fix_backwards_api_timeout(self, cloud): + def _fix_backwards_api_timeout( + self, cloud: dict[str, ty.Any] + ) -> dict[str, ty.Any]: new_cloud = {} # requests can only have one timeout, which means that in a single # cloud there is no point in different timeout values. However, @@ -972,7 +1032,7 @@ class OpenStackConfig: new_cloud['api_timeout'] = new_cloud.pop('timeout') return new_cloud - def get_all(self): + def get_all(self) -> list[cloud_region.CloudRegion]: clouds = [] for cloud in self.get_cloud_names(): @@ -983,7 +1043,7 @@ class OpenStackConfig: ) return clouds - def get_all_clouds(self): + def get_all_clouds(self) -> list[cloud_region.CloudRegion]: warnings.warn( "The 'get_all_clouds' method is a deprecated alias for " "'get_clouds' and will be removed in a future release.", @@ -991,7 +1051,11 @@ class OpenStackConfig: ) return self.get_all() - def _fix_args(self, args=None, argparse=None): + def _fix_args( + self, + args: dict[str, ty.Any] | None = None, + argparse: argparse_mod.Namespace | None = None, + ) -> dict[str, ty.Any]: """Massage the passed-in options Replace - with _ and strip os_ prefixes. @@ -1011,10 +1075,10 @@ class OpenStackConfig: parsed_args[k] = o_dict[k] args.update(parsed_args) - os_args = dict() - new_args = dict() + os_args = {} + new_args = {} for key, val in iter(args.items()): - if type(args[key]) is dict: + if isinstance(args[key], dict): # dive into the auth dict new_args[key] = self._fix_args(args[key]) continue @@ -1027,7 +1091,9 @@ class OpenStackConfig: new_args.update(os_args) return new_args - def _find_winning_auth_value(self, opt, config): + def _find_winning_auth_value( + self, opt: 'opts.Opt', config: dict[str, dict[str, ty.Any]] + ) -> dict[str, ty.Any] | None: opt_name = opt.name.replace('-', '_') if opt_name in config: return config[opt_name] @@ -1040,7 +1106,9 @@ class OpenStackConfig: if d_opt_name in config: return config[d_opt_name] - def auth_config_hook(self, config): + return None + + def auth_config_hook(self, config: dict[str, ty.Any]) -> dict[str, ty.Any]: """Allow examination of config values before loading auth plugin OpenStackClient will override this to perform additional checks @@ -1048,7 +1116,9 @@ class OpenStackConfig: """ return config - def _get_auth_loader(self, config): + def _get_auth_loader( + self, config: dict[str, ty.Any] + ) -> loading.BaseLoader[ty.Any]: # Use the 'none' plugin for variants of None specified, # since it does not look up endpoints or tokens but rather # does a passthrough. This is useful for things like Ironic @@ -1087,7 +1157,9 @@ class OpenStackConfig: return loader - def _validate_auth(self, config, loader): + def _validate_auth( + self, config: dict[str, ty.Any], loader: loading.BaseLoader[ty.Any] + ) -> dict[str, ty.Any]: # May throw a keystoneauth1.exceptions.NoMatchingPlugin plugin_options = loader.get_options() @@ -1127,7 +1199,9 @@ class OpenStackConfig: return config - def _validate_auth_correctly(self, config, loader): + def _validate_auth_correctly( + self, config: dict[str, ty.Any], loader: loading.BaseLoader[ty.Any] + ) -> dict[str, ty.Any]: # May throw a keystoneauth1.exceptions.NoMatchingPlugin plugin_options = loader.get_options() @@ -1158,7 +1232,9 @@ class OpenStackConfig: return config - def option_prompt(self, config, p_opt): + def option_prompt( + self, config: dict[str, ty.Any], p_opt: 'opts.Opt' + ) -> dict[str, ty.Any]: """Prompt user for option that requires a value""" if ( getattr(p_opt, 'prompt', None) is not None @@ -1168,7 +1244,12 @@ class OpenStackConfig: config['auth'][p_opt.dest] = self._pw_callback(p_opt.prompt) return config - def _clean_up_after_ourselves(self, config, p_opt, winning_value): + def _clean_up_after_ourselves( + self, + config: dict[str, ty.Any], + p_opt: 'opts.Opt', + winning_value: ty.Any, + ) -> dict[str, ty.Any]: # Clean up after ourselves for opt in [p_opt.name] + [o.name for o in p_opt.deprecated]: opt = opt.replace('-', '_') @@ -1184,7 +1265,9 @@ class OpenStackConfig: config['auth'][p_opt.dest] = winning_value return config - def _handle_value_types(self, config: dict) -> dict: + def _handle_value_types( + self, config: dict[str, ty.Any] + ) -> dict[str, ty.Any]: for key in BOOL_KEYS: if key in config: if not isinstance(config[key], bool): @@ -1196,7 +1279,7 @@ class OpenStackConfig: config[key] = config[key].split(',') return config - def magic_fixes(self, config): + def magic_fixes(self, config: dict[str, ty.Any]) -> dict[str, ty.Any]: """Perform the set of magic argument fixups""" # These backwards compat values are only set via argparse. If it's @@ -1345,8 +1428,12 @@ class OpenStackConfig: ) def get_one_cloud( - self, cloud=None, validate=True, argparse=None, **kwargs - ): + self, + cloud: str | None = None, + validate: bool = True, + argparse: argparse_mod.Namespace | None = None, + **kwargs: ty.Any, + ) -> cloud_region.CloudRegion: warnings.warn( "The 'get_one_cloud' method is a deprecated alias for 'get_one' " "and will be removed in a future release.", @@ -1360,8 +1447,12 @@ class OpenStackConfig: ) def get_one_cloud_osc( - self, cloud=None, validate=True, argparse=None, **kwargs - ): + self, + cloud: str | None = None, + validate: bool = True, + argparse: argparse_mod.Namespace | None = None, + **kwargs: ty.Any, + ) -> cloud_region.CloudRegion: """Retrieve a single CloudRegion and merge additional options :param string cloud: @@ -1456,7 +1547,11 @@ class OpenStackConfig: ) @staticmethod - def set_one_cloud(config_file, cloud, set_config=None): + def set_one_cloud( + config_file: str, + cloud: str, + set_config: dict[str, ty.Any] | None = None, + ) -> None: """Set a single cloud configuration. :param string config_file: diff --git a/pyproject.toml b/pyproject.toml index 732bec82f..e204658e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ module = [ "openstack.config._util", "openstack.config.defaults", "openstack.config.cloud_region", + "openstack.config.loader", "openstack.config.vendors", "openstack.connection", "openstack.exceptions", diff --git a/tools/nova_version.py b/tools/nova_version.py index e5593fbc9..65546a9f4 100644 --- a/tools/nova_version.py +++ b/tools/nova_version.py @@ -24,6 +24,8 @@ for cloud in openstack.config.OpenStackConfig().get_all_clouds(): try: raw_endpoint = c.get_endpoint() have_current = False + if raw_endpoint is None: + raise Exception('endpoint was empty') endpoint = raw_endpoint.rsplit('/', 2)[0] print(endpoint) r = c.get(endpoint).json()