typing: Annotate openstack.config.loader

Change-Id: I712bc5dc6ff43fd9508fcad1ddb86230e7c59c65
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-05-26 16:57:17 +01:00
parent 5a2b3bd81a
commit 550c15c83b
4 changed files with 167 additions and 61 deletions

View File

@@ -22,6 +22,13 @@ if ty.TYPE_CHECKING:
from openstack.config import cloud_region
__all__ = [
'OpenStackConfig',
'cloud_region',
'get_cloud_region',
]
# TODO(stephenfin): Expand kwargs once we've typed OpenstackConfig.get_one
def get_cloud_region(
service_key: str | None = None,
@@ -53,7 +60,8 @@ def get_cloud_region(
app_version=app_version,
)
if options:
config.register_argparse_arguments(options, sys.argv, service_key)
service_keys = [service_key] if service_key is not None else []
config.register_argparse_arguments(options, sys.argv, service_keys)
parsed_options, _ = options.parse_known_args(sys.argv)
else:
parsed_options = None

View File

@@ -40,6 +40,7 @@ from openstack import warnings as os_warnings
if ty.TYPE_CHECKING:
from keystoneauth1.loading._plugins.identity import v3 as v3_loaders
from keystoneauth1.loading import opts
PLATFORMDIRS = platformdirs.PlatformDirs(
'openstack', 'OpenStack', multipath=True
@@ -95,7 +96,7 @@ CSV_KEYS = ('auth_methods',)
FORMAT_EXCLUSIONS = frozenset(['password'])
def get_boolean(value):
def get_boolean(value: ty.Any) -> bool:
if value is None:
return False
if type(value) is bool:
@@ -105,7 +106,9 @@ def get_boolean(value):
return False
def _auth_update(old_dict, new_dict_source):
def _auth_update(
old_dict: dict[str, ty.Any], new_dict_source: dict[str, ty.Any]
) -> dict[str, ty.Any]:
"""Like dict.update, except handling the nested dict called auth."""
new_dict = copy.deepcopy(new_dict_source)
for k, v in new_dict.items():
@@ -119,7 +122,7 @@ def _auth_update(old_dict, new_dict_source):
return old_dict
def _fix_argv(argv):
def _fix_argv(argv: list[str]) -> None:
# Transform any _ characters in arg names to - so that we don't
# have to throw billions of compat argparse arguments around all
# over the place.
@@ -156,6 +159,15 @@ class OpenStackConfig:
_cloud_region_class = cloud_region.CloudRegion
_defaults_module = defaults
#: config_filename is the filename that configuration was loaded from, if
#: any.
config_filename: str | None
#: secure_config_filename is the filename that secure configuration was
#: loaded from, if any.
secure_config_filename: str | None
#: cloud_config contains the combined loaded configuration.
cloud_config: dict[str, ty.Any]
def __init__(
self,
config_files: list[str] | None = None,
@@ -213,21 +225,25 @@ class OpenStackConfig:
self.defaults.update(override_defaults)
# First, use a config file if it exists where expected
self.config_filename, self.cloud_config = self._load_config_file()
if self.config_filename:
self._validate_config_file(self.config_filename, self.cloud_config)
config_filename, cloud_config = self._load_config_file()
if config_filename and cloud_config:
self._validate_config_file(config_filename, cloud_config)
secure_config_filename, secure_config = self._load_secure_file()
if secure_config:
if secure_config_filename and secure_config:
self._validate_config_file(secure_config_filename, secure_config)
self.cloud_config = _util.merge_clouds(
self.cloud_config, secure_config
cloud_config = _util.merge_clouds(
cloud_config or {}, secure_config
)
if not self.cloud_config:
self.config_filename = config_filename
self.secure_config_filename = secure_config_filename
if not cloud_config:
self.cloud_config = {'clouds': {}}
elif 'clouds' not in self.cloud_config:
self.cloud_config['clouds'] = {}
else:
self.cloud_config = cloud_config
if 'clouds' not in self.cloud_config:
self.cloud_config['clouds'] = {}
# Save the other config
self.extra_config = copy.deepcopy(self.cloud_config)
@@ -400,7 +416,9 @@ class OpenStackConfig:
# password = self._pw_callback(prompt="Password: ")
self._pw_callback = pw_func
def _get_os_environ(self, envvar_prefix=None):
def _get_os_environ(
self, envvar_prefix: str | None = None
) -> dict[str, ty.Any] | None:
ret = self._defaults_module.get_defaults()
if not envvar_prefix:
# This makes the or below be OS_ or OS_ which is a no-op
@@ -429,12 +447,14 @@ class OpenStackConfig:
return ret
return None
def _get_envvar(self, key, default=None):
def _get_envvar(self, key: str, default: str | None = None) -> str | None:
if not self._load_envvars:
return default
return os.environ.get(key, default)
def get_extra_config(self, key, defaults=None):
def get_extra_config(
self, key: str, defaults: dict[str, ty.Any] | None = None
) -> dict[str, ty.Any]:
"""Fetch an arbitrary extra chunk of config, laying in defaults.
:param string key: name of the config section to fetch
@@ -442,22 +462,31 @@ class OpenStackConfig:
found config
"""
defaults = _util.normalize_keys(defaults or {})
assert defaults is not None # narrow type
if not key:
return defaults
return _util.merge_clouds(
defaults, _util.normalize_keys(self.cloud_config.get(key, {}))
)
def _load_config_file(self):
def _load_config_file(
self,
) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]:
return self._load_yaml_json_file(self._config_files)
def _load_secure_file(self):
def _load_secure_file(
self,
) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]:
return self._load_yaml_json_file(self._secure_files)
def _load_vendor_file(self):
def _load_vendor_file(
self,
) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]:
return self._load_yaml_json_file(self._vendor_files)
def _load_yaml_json_file(self, filelist):
def _load_yaml_json_file(
self, filelist: list[str]
) -> tuple[str, dict[str, ty.Any]] | tuple[None, None]:
for path in filelist:
if os.path.exists(path):
try:
@@ -471,7 +500,7 @@ class OpenStackConfig:
# Can't access file so let's continue to the next
# file
continue
return (None, {})
return (None, None)
def _validate_config_file(self, path: str, data: ty.Any) -> bool:
"""Validate config file contains a clouds entry.
@@ -492,10 +521,12 @@ class OpenStackConfig:
return True
def _expand_region_name(self, region_name):
def _expand_region_name(self, region_name: str) -> dict[str, ty.Any]:
return {'name': region_name, 'values': {}}
def _expand_regions(self, regions):
def _expand_regions(
self, regions: list[str | dict[str, ty.Any]]
) -> list[dict[str, ty.Any]]:
ret = []
for region in regions:
if isinstance(region, dict):
@@ -513,7 +544,7 @@ class OpenStackConfig:
ret.append(self._expand_region_name(region))
return ret
def _get_regions(self, cloud):
def _get_regions(self, cloud: str) -> list[dict[str, ty.Any]]:
if cloud not in self.cloud_config['clouds']:
return [self._expand_region_name('')]
regions = self._get_known_regions(cloud)
@@ -522,7 +553,7 @@ class OpenStackConfig:
regions = [self._expand_region_name('')]
return regions
def _get_known_regions(self, cloud):
def _get_known_regions(self, cloud: str) -> list[dict[str, ty.Any]]:
config = _util.normalize_keys(self.cloud_config['clouds'][cloud])
if 'regions' in config:
return self._expand_regions(config['regions'])
@@ -549,9 +580,14 @@ class OpenStackConfig:
elif 'region_name' in new_cloud and new_cloud['region_name']:
return [self._expand_region_name(new_cloud['region_name'])]
def _get_region(self, cloud=None, region_name=''):
return []
def _get_region(
self, cloud: str | None = None, region_name: str = ''
) -> dict[str, ty.Any]:
if region_name is None:
region_name = ''
if not cloud:
return self._expand_region_name(region_name)
@@ -576,11 +612,13 @@ class OpenStackConfig:
)
)
def get_cloud_names(self):
return self.cloud_config['clouds'].keys()
def get_cloud_names(self) -> list[str]:
return list(self.cloud_config['clouds'].keys())
def _get_base_cloud_config(self, name, profile=None):
cloud = dict()
def _get_base_cloud_config(
self, name: str | None, profile: str | None = None
) -> dict[str, ty.Any]:
cloud = {}
# Only validate cloud name if one was given
if name and name not in self.cloud_config['clouds']:
@@ -595,7 +633,7 @@ class OpenStackConfig:
self._expand_vendor_profile(name, cloud, our_cloud)
if 'auth' not in cloud:
cloud['auth'] = dict()
cloud['auth'] = {}
_auth_update(cloud, our_cloud)
if 'cloud' in cloud:
@@ -603,7 +641,12 @@ class OpenStackConfig:
return cloud
def _expand_vendor_profile(self, name, cloud, our_cloud):
def _expand_vendor_profile(
self,
name: str | None,
cloud: dict[str, ty.Any],
our_cloud: dict[str, ty.Any],
) -> None:
# Expand a profile if it exists. 'cloud' is an old confusing name
# for this.
profile_name = our_cloud.get('profile', our_cloud.get('cloud', None))
@@ -653,7 +696,7 @@ class OpenStackConfig:
os_warnings.ConfigurationWarning,
)
def _project_scoped(self, cloud):
def _project_scoped(self, cloud: dict[str, ty.Any]) -> bool:
return (
'project_id' in cloud
or 'project_name' in cloud
@@ -661,7 +704,9 @@ class OpenStackConfig:
or 'project_name' in cloud['auth']
)
def _validate_networks(self, networks, key):
def _validate_networks(
self, networks: list[dict[str, ty.Any]], key: str
) -> None:
value = None
for net in networks:
if value and net[key]:
@@ -674,7 +719,9 @@ class OpenStackConfig:
if not value and net[key]:
value = net
def _fix_backwards_networks(self, cloud):
def _fix_backwards_networks(
self, cloud: dict[str, ty.Any]
) -> dict[str, ty.Any]:
# Leave the external_network and internal_network keys in the
# dict because consuming code might be expecting them.
networks = []
@@ -733,7 +780,7 @@ class OpenStackConfig:
cloud['networks'] = networks
return cloud
def _handle_domain_id(self, cloud):
def _handle_domain_id(self, cloud: dict[str, ty.Any]) -> dict[str, ty.Any]:
# Allow people to just specify domain once if it's the same
mappings = {
'domain_id': ('user_domain_id', 'project_domain_id'),
@@ -751,7 +798,9 @@ class OpenStackConfig:
cloud['auth'].pop(target_key, None)
return cloud
def _fix_backwards_auth(self, cloud):
def _fix_backwards_auth(
self, cloud: dict[str, ty.Any]
) -> dict[str, ty.Any]:
mappings = {
'domain_id': ('domain_id', 'domain-id'),
'domain_name': ('domain_name', 'domain-name'),
@@ -810,7 +859,9 @@ class OpenStackConfig:
cloud['auth'][target_key] = target
return cloud
def _fix_backwards_auth_plugin(self, cloud):
def _fix_backwards_auth_plugin(
self, cloud: dict[str, ty.Any]
) -> dict[str, ty.Any]:
# Do the lists backwards so that auth_type is the ultimate winner
mappings = {
'auth_type': ('auth_plugin', 'auth_type'),
@@ -828,7 +879,12 @@ class OpenStackConfig:
# completely broken
return cloud
def register_argparse_arguments(self, parser, argv, service_keys=None):
def register_argparse_arguments(
self,
parser: argparse_mod.ArgumentParser,
argv: list[str],
service_keys: list[str] | None = None,
) -> None:
"""Register all of the common argparse options needed.
Given an argparse parser, register the keystoneauth Session arguments,
@@ -937,7 +993,9 @@ class OpenStackConfig:
parser.add_argument('--os-endpoint-type', help=argparse_mod.SUPPRESS)
parser.add_argument('--endpoint-type', help=argparse_mod.SUPPRESS)
def _fix_backwards_interface(self, cloud):
def _fix_backwards_interface(
self, cloud: dict[str, ty.Any]
) -> dict[str, ty.Any]:
new_cloud = {}
for key in cloud.keys():
if key.endswith('endpoint_type'):
@@ -947,7 +1005,9 @@ class OpenStackConfig:
new_cloud[target_key] = cloud[key]
return new_cloud
def _fix_backwards_api_timeout(self, cloud):
def _fix_backwards_api_timeout(
self, cloud: dict[str, ty.Any]
) -> dict[str, ty.Any]:
new_cloud = {}
# requests can only have one timeout, which means that in a single
# cloud there is no point in different timeout values. However,
@@ -972,7 +1032,7 @@ class OpenStackConfig:
new_cloud['api_timeout'] = new_cloud.pop('timeout')
return new_cloud
def get_all(self):
def get_all(self) -> list[cloud_region.CloudRegion]:
clouds = []
for cloud in self.get_cloud_names():
@@ -983,7 +1043,7 @@ class OpenStackConfig:
)
return clouds
def get_all_clouds(self):
def get_all_clouds(self) -> list[cloud_region.CloudRegion]:
warnings.warn(
"The 'get_all_clouds' method is a deprecated alias for "
"'get_clouds' and will be removed in a future release.",
@@ -991,7 +1051,11 @@ class OpenStackConfig:
)
return self.get_all()
def _fix_args(self, args=None, argparse=None):
def _fix_args(
self,
args: dict[str, ty.Any] | None = None,
argparse: argparse_mod.Namespace | None = None,
) -> dict[str, ty.Any]:
"""Massage the passed-in options
Replace - with _ and strip os_ prefixes.
@@ -1011,10 +1075,10 @@ class OpenStackConfig:
parsed_args[k] = o_dict[k]
args.update(parsed_args)
os_args = dict()
new_args = dict()
os_args = {}
new_args = {}
for key, val in iter(args.items()):
if type(args[key]) is dict:
if isinstance(args[key], dict):
# dive into the auth dict
new_args[key] = self._fix_args(args[key])
continue
@@ -1027,7 +1091,9 @@ class OpenStackConfig:
new_args.update(os_args)
return new_args
def _find_winning_auth_value(self, opt, config):
def _find_winning_auth_value(
self, opt: 'opts.Opt', config: dict[str, dict[str, ty.Any]]
) -> dict[str, ty.Any] | None:
opt_name = opt.name.replace('-', '_')
if opt_name in config:
return config[opt_name]
@@ -1040,7 +1106,9 @@ class OpenStackConfig:
if d_opt_name in config:
return config[d_opt_name]
def auth_config_hook(self, config):
return None
def auth_config_hook(self, config: dict[str, ty.Any]) -> dict[str, ty.Any]:
"""Allow examination of config values before loading auth plugin
OpenStackClient will override this to perform additional checks
@@ -1048,7 +1116,9 @@ class OpenStackConfig:
"""
return config
def _get_auth_loader(self, config):
def _get_auth_loader(
self, config: dict[str, ty.Any]
) -> loading.BaseLoader[ty.Any]:
# Use the 'none' plugin for variants of None specified,
# since it does not look up endpoints or tokens but rather
# does a passthrough. This is useful for things like Ironic
@@ -1087,7 +1157,9 @@ class OpenStackConfig:
return loader
def _validate_auth(self, config, loader):
def _validate_auth(
self, config: dict[str, ty.Any], loader: loading.BaseLoader[ty.Any]
) -> dict[str, ty.Any]:
# May throw a keystoneauth1.exceptions.NoMatchingPlugin
plugin_options = loader.get_options()
@@ -1127,7 +1199,9 @@ class OpenStackConfig:
return config
def _validate_auth_correctly(self, config, loader):
def _validate_auth_correctly(
self, config: dict[str, ty.Any], loader: loading.BaseLoader[ty.Any]
) -> dict[str, ty.Any]:
# May throw a keystoneauth1.exceptions.NoMatchingPlugin
plugin_options = loader.get_options()
@@ -1158,7 +1232,9 @@ class OpenStackConfig:
return config
def option_prompt(self, config, p_opt):
def option_prompt(
self, config: dict[str, ty.Any], p_opt: 'opts.Opt'
) -> dict[str, ty.Any]:
"""Prompt user for option that requires a value"""
if (
getattr(p_opt, 'prompt', None) is not None
@@ -1168,7 +1244,12 @@ class OpenStackConfig:
config['auth'][p_opt.dest] = self._pw_callback(p_opt.prompt)
return config
def _clean_up_after_ourselves(self, config, p_opt, winning_value):
def _clean_up_after_ourselves(
self,
config: dict[str, ty.Any],
p_opt: 'opts.Opt',
winning_value: ty.Any,
) -> dict[str, ty.Any]:
# Clean up after ourselves
for opt in [p_opt.name] + [o.name for o in p_opt.deprecated]:
opt = opt.replace('-', '_')
@@ -1184,7 +1265,9 @@ class OpenStackConfig:
config['auth'][p_opt.dest] = winning_value
return config
def _handle_value_types(self, config: dict) -> dict:
def _handle_value_types(
self, config: dict[str, ty.Any]
) -> dict[str, ty.Any]:
for key in BOOL_KEYS:
if key in config:
if not isinstance(config[key], bool):
@@ -1196,7 +1279,7 @@ class OpenStackConfig:
config[key] = config[key].split(',')
return config
def magic_fixes(self, config):
def magic_fixes(self, config: dict[str, ty.Any]) -> dict[str, ty.Any]:
"""Perform the set of magic argument fixups"""
# These backwards compat values are only set via argparse. If it's
@@ -1345,8 +1428,12 @@ class OpenStackConfig:
)
def get_one_cloud(
self, cloud=None, validate=True, argparse=None, **kwargs
):
self,
cloud: str | None = None,
validate: bool = True,
argparse: argparse_mod.Namespace | None = None,
**kwargs: ty.Any,
) -> cloud_region.CloudRegion:
warnings.warn(
"The 'get_one_cloud' method is a deprecated alias for 'get_one' "
"and will be removed in a future release.",
@@ -1360,8 +1447,12 @@ class OpenStackConfig:
)
def get_one_cloud_osc(
self, cloud=None, validate=True, argparse=None, **kwargs
):
self,
cloud: str | None = None,
validate: bool = True,
argparse: argparse_mod.Namespace | None = None,
**kwargs: ty.Any,
) -> cloud_region.CloudRegion:
"""Retrieve a single CloudRegion and merge additional options
:param string cloud:
@@ -1456,7 +1547,11 @@ class OpenStackConfig:
)
@staticmethod
def set_one_cloud(config_file, cloud, set_config=None):
def set_one_cloud(
config_file: str,
cloud: str,
set_config: dict[str, ty.Any] | None = None,
) -> None:
"""Set a single cloud configuration.
:param string config_file:

View File

@@ -36,6 +36,7 @@ module = [
"openstack.config._util",
"openstack.config.defaults",
"openstack.config.cloud_region",
"openstack.config.loader",
"openstack.config.vendors",
"openstack.connection",
"openstack.exceptions",

View File

@@ -24,6 +24,8 @@ for cloud in openstack.config.OpenStackConfig().get_all_clouds():
try:
raw_endpoint = c.get_endpoint()
have_current = False
if raw_endpoint is None:
raise Exception('endpoint was empty')
endpoint = raw_endpoint.rsplit('/', 2)[0]
print(endpoint)
r = c.get(endpoint).json()