diff --git a/keywords/cloud_platform/rest/bare_metal/disks/get_host_disks_keywords.py b/keywords/cloud_platform/rest/bare_metal/disks/get_host_disks_keywords.py new file mode 100644 index 00000000..4d122422 --- /dev/null +++ b/keywords/cloud_platform/rest/bare_metal/disks/get_host_disks_keywords.py @@ -0,0 +1,27 @@ +from keywords.base_keyword import BaseKeyword +from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient +from keywords.cloud_platform.rest.get_rest_url_keywords import GetRestUrlKeywords +from keywords.cloud_platform.system.host.objects.system_host_disk_output import SystemHostDiskOutput + + +class GetHostDisksKeywords(BaseKeyword): + """Keywords for retrieving host disk information via REST API.""" + + def __init__(self): + """Initialize GetHostDisksKeywords with bare metal URL.""" + self.bare_metal_base_url = GetRestUrlKeywords().get_bare_metal_url() + + def get_disks(self, host_id: str) -> SystemHostDiskOutput: + """Get host disks using the REST API. + + Args: + host_id (str): The UUID or ID of the host to retrieve disks for. + + Returns: + SystemHostDiskOutput: Object containing the host's disk information. + """ + response = CloudRestClient().get(f"{self.bare_metal_base_url}/ihosts/{host_id}/idisks") + self.validate_success_status_code(response) + system_host_disk_output = SystemHostDiskOutput(response) + + return system_host_disk_output diff --git a/keywords/cloud_platform/rest/configuration/addresses/get_host_addresses_keywords.py b/keywords/cloud_platform/rest/configuration/addresses/get_host_addresses_keywords.py new file mode 100644 index 00000000..28090614 --- /dev/null +++ b/keywords/cloud_platform/rest/configuration/addresses/get_host_addresses_keywords.py @@ -0,0 +1,26 @@ +from keywords.base_keyword import BaseKeyword +from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient +from keywords.cloud_platform.rest.configuration.addresses.objects.host_address_output import HostAddressOutput +from keywords.cloud_platform.rest.get_rest_url_keywords import GetRestUrlKeywords + + +class GetHostAddressesKeywords(BaseKeyword): + """Keywords for retrieving host address information via REST API.""" + + def __init__(self): + """Initialize GetHostAddressesKeywords with configuration URL.""" + self.configuration_base_url = GetRestUrlKeywords().get_configuration_url() + + def get_host_addresses(self, host_id: str) -> HostAddressOutput: + """Get host addresses using the REST API. + + Args: + host_id (str): The UUID or ID of the host to retrieve addresses for. + + Returns: + HostAddressOutput: Object containing the host's address information. + """ + response = CloudRestClient().get(f"{self.configuration_base_url}/ihosts/{host_id}/addresses") + self.validate_success_status_code(response) + host_address_output = HostAddressOutput(response) + return host_address_output diff --git a/keywords/cloud_platform/rest/configuration/addresses/objects/host_address_object.py b/keywords/cloud_platform/rest/configuration/addresses/objects/host_address_object.py new file mode 100644 index 00000000..86ffe69f --- /dev/null +++ b/keywords/cloud_platform/rest/configuration/addresses/objects/host_address_object.py @@ -0,0 +1,141 @@ +class HostAddressObject: + """This class represents a Host Address as an object.""" + + def __init__(self): + """Initialize a new HostAddressObject with default values.""" + self.uuid = None + self.interface_uuid = None + self.ifname = None + self.address = None + self.prefix = -1 + self.enable_dad = False + self.forihostid = -1 + self.pool_uuid = None + + def set_uuid(self, uuid: str): + """Set the UUID of the host address. + + Args: + uuid (str): The unique identifier for the host address. + """ + self.uuid = uuid + + def get_uuid(self) -> str: + """Get the UUID of the host address. + + Returns: + str: The unique identifier for the host address. + """ + return self.uuid + + def set_interface_uuid(self, interface_uuid: str): + """Set the interface UUID associated with this address. + + Args: + interface_uuid (str): The unique identifier of the network interface. + """ + self.interface_uuid = interface_uuid + + def get_interface_uuid(self) -> str: + """Get the interface UUID associated with this address. + + Returns: + str: The unique identifier of the network interface. + """ + return self.interface_uuid + + def set_ifname(self, ifname: str): + """Set the interface name. + + Args: + ifname (str): The name of the network interface (e.g., 'eth0', 'oam0'). + """ + self.ifname = ifname + + def get_ifname(self) -> str: + """Get the interface name. + + Returns: + str: The name of the network interface. + """ + return self.ifname + + def set_address(self, address: str): + """Set the IP address. + + Args: + address (str): The IP address (IPv4 or IPv6). + """ + self.address = address + + def get_address(self) -> str: + """Get the IP address. + + Returns: + str: The IP address (IPv4 or IPv6). + """ + return self.address + + def set_prefix(self, prefix: int): + """Set the network prefix length. + + Args: + prefix (int): The subnet prefix length (e.g., 24 for /24). + """ + self.prefix = prefix + + def get_prefix(self) -> int: + """Get the network prefix length. + + Returns: + int: The subnet prefix length. + """ + return self.prefix + + def set_enable_dad(self, enable_dad: bool): + """Set the Duplicate Address Detection (DAD) flag. + + Args: + enable_dad (bool): Whether DAD is enabled for this address. + """ + self.enable_dad = enable_dad + + def get_enable_dad(self) -> bool: + """Get the Duplicate Address Detection (DAD) flag. + + Returns: + bool: Whether DAD is enabled for this address. + """ + return self.enable_dad + + def set_forihostid(self, forihostid: int): + """Set the foreign host ID. + + Args: + forihostid (int): The ID of the host this address belongs to. + """ + self.forihostid = forihostid + + def get_forihostid(self) -> int: + """Get the foreign host ID. + + Returns: + int: The ID of the host this address belongs to. + """ + return self.forihostid + + def set_pool_uuid(self, pool_uuid: str): + """Set the address pool UUID. + + Args: + pool_uuid (str): The unique identifier of the address pool. + """ + self.pool_uuid = pool_uuid + + def get_pool_uuid(self) -> str: + """Get the address pool UUID. + + Returns: + str: The unique identifier of the address pool. + """ + return self.pool_uuid diff --git a/keywords/cloud_platform/rest/configuration/addresses/objects/host_address_output.py b/keywords/cloud_platform/rest/configuration/addresses/objects/host_address_output.py new file mode 100644 index 00000000..5a9bd0f4 --- /dev/null +++ b/keywords/cloud_platform/rest/configuration/addresses/objects/host_address_output.py @@ -0,0 +1,76 @@ +from framework.rest.rest_response import RestResponse +from keywords.cloud_platform.rest.configuration.addresses.objects.host_address_object import HostAddressObject +from keywords.python.type_converter import TypeConverter + + +class HostAddressOutput: + """Parses host addresses from REST API responses.""" + + def __init__(self, host_address_output: RestResponse | list | dict) -> None: + """Initialize HostAddressOutput with parsed address data. + + Args: + host_address_output (RestResponse | list | dict): REST response from /v1/ihosts/{host_id}/addresses endpoint. + """ + if isinstance(host_address_output, RestResponse): + json_object = host_address_output.get_json_content() + if "addresses" in json_object: + addresses = json_object["addresses"] + else: + addresses = [json_object] if json_object else [] + else: + addresses = host_address_output if isinstance(host_address_output, list) else [host_address_output] + + self.host_address_objects: list[HostAddressObject] = [] + + for address in addresses: + host_address_object = HostAddressObject() + + if "uuid" in address: + host_address_object.set_uuid(address["uuid"]) + + if "interface_uuid" in address: + host_address_object.set_interface_uuid(address["interface_uuid"]) + + if "ifname" in address: + host_address_object.set_ifname(address["ifname"]) + + if "address" in address: + host_address_object.set_address(address["address"]) + + if "prefix" in address: + host_address_object.set_prefix(int(address["prefix"])) + + if "enable_dad" in address: + value = address["enable_dad"] if isinstance(address["enable_dad"], bool) else TypeConverter.str_to_bool(address["enable_dad"]) + host_address_object.set_enable_dad(value) + + if "forihostid" in address: + host_address_object.set_forihostid(int(address["forihostid"])) + + if "pool_uuid" in address: + host_address_object.set_pool_uuid(address["pool_uuid"]) + + self.host_address_objects.append(host_address_object) + + def get_host_address_objects(self) -> list[HostAddressObject]: + """Get all host address objects. + + Returns: + list[HostAddressObject]: List of HostAddressObject instances. + """ + return self.host_address_objects + + def get_address_by_ifname(self, ifname: str) -> str | None: + """Get IP address by interface name. + + Args: + ifname (str): Interface name to search for. + + Returns: + str | None: IP address string or None if not found. + """ + for addr in self.host_address_objects: + if addr.get_ifname() == ifname: + return addr.get_address() + return None diff --git a/keywords/cloud_platform/rest/configuration/storage/get_storage_backends_keyword.py b/keywords/cloud_platform/rest/configuration/storage/get_storage_backends_keyword.py new file mode 100644 index 00000000..761b9272 --- /dev/null +++ b/keywords/cloud_platform/rest/configuration/storage/get_storage_backends_keyword.py @@ -0,0 +1,24 @@ +from keywords.base_keyword import BaseKeyword +from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient +from keywords.cloud_platform.rest.get_rest_url_keywords import GetRestUrlKeywords +from keywords.cloud_platform.system.storage.objects.system_storage_backend_output import SystemStorageBackendOutput + + +class GetStorageBackendKeywords(BaseKeyword): + """Keywords for retrieving storage backend information via REST API.""" + + def __init__(self): + """Initialize GetStorageBackendKeywords with configuration URL.""" + self.configuration_base_url = GetRestUrlKeywords().get_configuration_url() + + def get_storage_backends(self) -> SystemStorageBackendOutput: + """Get storage backends using the REST API. + + Returns: + SystemStorageBackendOutput: Object containing storage backend information. + """ + response = CloudRestClient().get(f"{self.configuration_base_url}/storage_backend") + self.validate_success_status_code(response) + storage_output = SystemStorageBackendOutput(response) + + return storage_output diff --git a/keywords/cloud_platform/rest/configuration/system/objects/system_object.py b/keywords/cloud_platform/rest/configuration/system/objects/system_object.py index b173b764..57c373e4 100644 --- a/keywords/cloud_platform/rest/configuration/system/objects/system_object.py +++ b/keywords/cloud_platform/rest/configuration/system/objects/system_object.py @@ -1,14 +1,15 @@ -import json5 from keywords.cloud_platform.rest.configuration.system.objects.system_capabilities_object import SystemCapabilitiesObject -from keywords.cloud_platform.system.host.objects.host_capabilities_object import HostCapabilities class SystemObject: - """ - Class for System Object - """ + """Represents a StarlingX system with its configuration and metadata.""" def __init__(self, uuid: str): + """Initialize a SystemObject with the given UUID. + + Args: + uuid (str): The unique identifier for the system. + """ self.uuid = uuid self.name: str = None @@ -22,7 +23,7 @@ class SystemObject: self.software_version: str = None self.timezone: str = None self.links = None - self.capabilities: HostCapabilities = None + self.capabilities: SystemCapabilitiesObject = None self.region_name: str = None self.distributed_cloud_role: str = None self.service_project_name: str = None @@ -31,307 +32,303 @@ class SystemObject: self.updated_at: str = None def set_name(self, name: str): - """ - Setter for name - Args - name () - the name + """Set the system name. + + Args: + name (str): The name of the system. """ self.name = name def get_name(self) -> str: - """ - Getter for name - Args - + """Get the system name. + + Returns: + str: The name of the system. """ return self.name - + def set_system_type(self, system_type: str): - """ - Setter for system_type - Args - system_type () - the system_type + """Set the system type. + + Args: + system_type (str): The type of the system. """ self.system_type = system_type def get_system_type(self) -> str: - """ - Getter for system_type - Args - + """Get the system type. + + Returns: + str: The type of the system. """ return self.system_type - + def set_system_mode(self, system_mode: str): - """ - Setter for system_mode - Args - system_mode () - the system_mode + """Set the system mode. + + Args: + system_mode (str): The mode of the system. """ self.system_mode = system_mode def get_system_mode(self) -> str: - """ - Getter for system_mode - Args - + """Get the system mode. + + Returns: + str: The mode of the system. """ return self.system_mode - + def set_description(self, description: str): - """ - Setter for description - Args - description () - the description + """Set the system description. + + Args: + description (str): The description of the system. """ self.description = description def get_description(self) -> str: - """ - Getter for description - Args - + """Get the system description. + + Returns: + str: The description of the system. """ return self.description - + def set_contact(self, contact: str): - """ - Setter for contact - Args - contact () - the contact + """Set the system contact information. + + Args: + contact (str): The contact information for the system. """ self.contact = contact def get_contact(self) -> str: - """ - Getter for contact - Args - + """Get the system contact information. + + Returns: + str: The contact information for the system. """ return self.contact - + def set_location(self, location: str): - """ - Setter for location - Args - location () - the location + """Set the system location. + + Args: + location (str): The location of the system. """ self.location = location def get_location(self) -> str: - """ - Getter for location - Args - + """Get the system location. + + Returns: + str: The location of the system. """ return self.location - + def set_latitude(self, latitude: str): - """ - Setter for latitude - Args - latitude () - the latitude + """Set the system latitude. + + Args: + latitude (str): The latitude coordinate of the system. """ self.latitude = latitude def get_latitude(self) -> str: - """ - Getter for latitude - Args - + """Get the system latitude. + + Returns: + str: The latitude coordinate of the system. """ return self.latitude - + def set_longitude(self, longitude: str): - """ - Setter for longitude - Args - longitude () - the longitude + """Set the system longitude. + + Args: + longitude (str): The longitude coordinate of the system. """ self.longitude = longitude def get_longitude(self) -> str: - """ - Getter for longitude - Args - + """Get the system longitude. + + Returns: + str: The longitude coordinate of the system. """ return self.longitude - + def set_software_version(self, software_version: str): - """ - Setter for software_version - Args - software_version () - the software_version + """Set the system software version. + + Args: + software_version (str): The software version of the system. """ self.software_version = software_version def get_software_version(self) -> str: - """ - Getter for software_version - Args - + """Get the system software version. + + Returns: + str: The software version of the system. """ return self.software_version - + def set_timezone(self, timezone: str): - """ - Setter for timezone - Args - name () - the timezone + """Set the system timezone. + + Args: + timezone (str): The timezone of the system. """ self.timezone = timezone def get_timezone(self) -> str: - """ - Getter for timezone - Args - + """Get the system timezone. + + Returns: + str: The timezone of the system. """ return self.timezone - + def set_links(self, links: str): - """ - Setter for links - Args - links () - the links + """Set the system links. + + Args: + links (str): The links associated with the system. """ self.links = links def get_links(self) -> str: - """ - Getter for links - Args - + """Get the system links. + + Returns: + str: The links associated with the system. """ return self.links - - def set_capabilities(self, capabilities): - """ - Setter for host capabilities + + def set_capabilities(self, capabilities: dict): + """Set the system capabilities from a dictionary. + Args: - capabilities (): the string of capabilities from the system host-list command (json format) - - Returns: - + capabilities (dict): Dictionary containing system capabilities data. """ - system_capability = SystemCapabilitiesObject() - if 'region_config' in capabilities: - system_capability.set_region_config(capabilities['region_config']) - if 'vswitch_type' in capabilities: - system_capability.set_vswitch_type(capabilities['vswitch_type']) - if 'shared_services' in capabilities: - system_capability.set_shared_services(capabilities['shared_services']) - if 'sdn_enabled' in capabilities: - system_capability.set_sdn_enabled(capabilities['sdn_enabled']) - if 'https_enabled' in capabilities: - system_capability.set_https_enabled(capabilities['https_enabled']) - if 'bm_region' in capabilities: - system_capability.set_bm_region(capabilities['bm_region']) + if "region_config" in capabilities: + system_capability.set_region_config(capabilities["region_config"]) + if "vswitch_type" in capabilities: + system_capability.set_vswitch_type(capabilities["vswitch_type"]) + if "shared_services" in capabilities: + system_capability.set_shared_services(capabilities["shared_services"]) + if "sdn_enabled" in capabilities: + system_capability.set_sdn_enabled(capabilities["sdn_enabled"]) + if "https_enabled" in capabilities: + system_capability.set_https_enabled(capabilities["https_enabled"]) + if "bm_region" in capabilities: + system_capability.set_bm_region(capabilities["bm_region"]) - self.capabilities = capabilities + self.capabilities = system_capability def get_capabilities(self) -> SystemCapabilitiesObject: - """ - Getter for capabilities - Returns: + """Get the system capabilities object. + Returns: + SystemCapabilitiesObject: The system's capabilities object. """ return self.capabilities - + def set_region_name(self, region_name: str): - """ - Setter for region_name - Args - region_name () - the region_name + """Set the system region name. + + Args: + region_name (str): The name of the region for this system. """ self.region_name = region_name def get_region_name(self) -> str: - """ - Getter for region_name - Args - + """Get the system region name. + + Returns: + str: The name of the region for this system. """ return self.region_name - + def set_distributed_cloud_role(self, distributed_cloud_role: str): - """ - Setter for distributed_cloud_role - Args - distributed_cloud_role () - the distributed_cloud_role + """Set the distributed cloud role. + + Args: + distributed_cloud_role (str): The distributed cloud role of the system. """ self.distributed_cloud_role = distributed_cloud_role def get_distributed_cloud_role(self) -> str: - """ - Getter for distributed_cloud_role - Args - + """Get the distributed cloud role. + + Returns: + str: The distributed cloud role of the system. """ return self.distributed_cloud_role - + def set_service_project_name(self, service_project_name: str): - """ - Setter for service_project_name - Args - service_project_name () - the service_project_name + """Set the service project name. + + Args: + service_project_name (str): The name of the service project. """ self.service_project_name = service_project_name def get_service_project_name(self) -> str: - """ - Getter for service_project_name - Args - + """Get the service project name. + + Returns: + str: The name of the service project. """ return self.service_project_name - + def set_security_feature(self, security_feature: str): - """ - Setter for security_feature - Args - security_feature () - the security_feature + """Set the security feature. + + Args: + security_feature (str): The security feature of the system. """ self.security_feature = security_feature def get_security_feature(self) -> str: - """ - Getter for security_feature - Args - + """Get the security feature. + + Returns: + str: The security feature of the system. """ return self.security_feature - + def set_created_at(self, created_at: str): - """ - Setter for created_at - Args - created_at () - the created_at + """Set the creation timestamp. + + Args: + created_at (str): The timestamp when the system was created. """ self.created_at = created_at def get_created_at(self) -> str: - """ - Getter for created_at - Args - + """Get the creation timestamp. + + Returns: + str: The timestamp when the system was created. """ return self.created_at - + def set_updated_at(self, updated_at: str): - """ - Setter for updated_at - Args - updated_at () - the updated_at + """Set the update timestamp. + + Args: + updated_at (str): The timestamp when the system was last updated. """ self.updated_at = updated_at def get_updated_at(self) -> str: + """Get the update timestamp. + + Returns: + str: The timestamp when the system was last updated. """ - Getter for updated_at - Args - - """ - return self.updated_at \ No newline at end of file + return self.updated_at diff --git a/keywords/cloud_platform/system/application/object/system_application_list_output.py b/keywords/cloud_platform/system/application/object/system_application_list_output.py index 622f4ebc..5745d136 100644 --- a/keywords/cloud_platform/system/application/object/system_application_list_output.py +++ b/keywords/cloud_platform/system/application/object/system_application_list_output.py @@ -5,8 +5,8 @@ from keywords.cloud_platform.system.system_table_parser import SystemTableParser class SystemApplicationListOutput: - """ - This class parses the output of the command 'system application-list' + """This class parses the output of the command 'system application-list'. + The parsing result is a 'SystemApplicationListObject' instance. Example: @@ -27,11 +27,11 @@ class SystemApplicationListOutput: """ - def __init__(self, system_application_list_output): - """ - Constructor + def __init__(self, system_application_list_output: str) -> None: + """Initialize SystemApplicationListOutput with parsed application data. + Args: - system_application_list_output: the output of the command 'system application-list'. + system_application_list_output (str): The output of the 'system application-list' command. """ self.system_applications: [SystemApplicationListObject] = [] system_table_parser = SystemTableParser(system_application_list_output) @@ -41,33 +41,36 @@ class SystemApplicationListOutput: if self.is_valid_output(value): self.system_applications.append( SystemApplicationListObject( - value['application'], - value['version'], - value['manifest name'], - value['manifest file'], - value['status'], - value['progress'], + value["application"], + value["version"], + value["manifest name"], + value["manifest file"], + value["status"], + value["progress"], ) ) else: raise KeywordException(f"The output line {value} was not valid") - def get_applications(self) -> [SystemApplicationListObject]: - """ - Returns the list of applications objects - Returns: + def get_applications(self) -> list[SystemApplicationListObject]: + """Get the list of all application objects. + Returns: + list[SystemApplicationListObject]: List of SystemApplicationListObject instances. """ return self.system_applications def get_application(self, application: str) -> SystemApplicationListObject: - """ - Gets the given application + """Get a specific application by name. + Args: - application (): the name of the application + application (str): The name of the application to retrieve. - Returns: the application object + Returns: + SystemApplicationListObject: The application object. + Raises: + KeywordException: If no application with the given name is found. """ applications = list(filter(lambda app: app.get_application() == application, self.system_applications)) if len(applications) == 0: @@ -75,34 +78,46 @@ class SystemApplicationListOutput: return applications[0] - @staticmethod - def is_valid_output(value): - """ - Checks to ensure the output has the correct keys + def application_exists(self, application: str) -> bool: + """Check if an application exists in the list. + Args: - value (): the value to check + application (str): The name of the application to check. Returns: + bool: True if the application exists, False otherwise. + """ + applications = list(filter(lambda app: app.get_application() == application, self.system_applications)) + return len(applications) > 0 + @staticmethod + def is_valid_output(value: dict) -> bool: + """Validate that output contains all required keys. + + Args: + value (dict): The parsed output value to validate. + + Returns: + bool: True if the output is valid, False otherwise. """ valid = True - if 'application' not in value: - get_logger().log_error(f'application is not in the output value: {value}') + if "application" not in value: + get_logger().log_error(f"application is not in the output value: {value}") valid = False - if 'version' not in value: - get_logger().log_error(f'version is not in the output value: {value}') + if "version" not in value: + get_logger().log_error(f"version is not in the output value: {value}") valid = False - if 'manifest name' not in value: - get_logger().log_error(f'manifest name is not in the output value: {value}') + if "manifest name" not in value: + get_logger().log_error(f"manifest name is not in the output value: {value}") valid = False - if 'manifest file' not in value: - get_logger().log_error(f'manifest file is not in the output value: {value}') + if "manifest file" not in value: + get_logger().log_error(f"manifest file is not in the output value: {value}") valid = False - if 'status' not in value: - get_logger().log_error(f'status is not in the output value: {value}') + if "status" not in value: + get_logger().log_error(f"status is not in the output value: {value}") valid = False - if 'progress' not in value: - get_logger().log_error(f'progress is not in the output value: {value}') + if "progress" not in value: + get_logger().log_error(f"progress is not in the output value: {value}") valid = False return valid diff --git a/keywords/cloud_platform/system/host/objects/system_host_disk_output.py b/keywords/cloud_platform/system/host/objects/system_host_disk_output.py index 85cd6255..3cfa9103 100644 --- a/keywords/cloud_platform/system/host/objects/system_host_disk_output.py +++ b/keywords/cloud_platform/system/host/objects/system_host_disk_output.py @@ -1,63 +1,74 @@ +from framework.rest.rest_response import RestResponse from keywords.cloud_platform.system.host.objects.system_host_disk_object import SystemHostDiskObject from keywords.cloud_platform.system.system_table_parser import SystemTableParser class SystemHostDiskOutput: - """ - This class parses the output of 'system host-disk-list' commands into a list of SystemHostDiskObject - """ + """Parses the output of 'system host-disk-list' commands into SystemHostDiskObject instances.""" - def __init__(self, system_host_disk_output): - """ - Constructor + def __init__(self, system_host_disk_output: str | RestResponse) -> None: + """Initialize SystemHostDiskOutput with parsed disk data. Args: - system_host_disk_output: String output of 'system host-disk-list' command + system_host_disk_output (str | RestResponse): String output from 'system host-disk-list' command or RestResponse object. """ + self.system_host_disks: list[SystemHostDiskObject] = [] - self.system_host_disks: [SystemHostDiskObject] = [] - system_table_parser = SystemTableParser(system_host_disk_output) - output_values = system_table_parser.get_output_values_list() + if isinstance(system_host_disk_output, RestResponse): # came from REST and is already in dict form + json_object = system_host_disk_output.get_json_content() + if "idisks" in json_object: + disks = json_object["idisks"] + else: + disks = [json_object] + else: + system_table_parser = SystemTableParser(system_host_disk_output) + disks = system_table_parser.get_output_values_list() - for value in output_values: + for value in disks: system_host_disk_object = SystemHostDiskObject() - if 'uuid' in value: - system_host_disk_object.set_uuid(value['uuid']) + if "uuid" in value: + system_host_disk_object.set_uuid(value["uuid"]) - if 'device_node' in value: - system_host_disk_object.set_device_node(value['device_node']) + if "device_node" in value: + system_host_disk_object.set_device_node(value["device_node"]) - if 'device_num' in value: - system_host_disk_object.set_device_num(value['device_num']) + if "device_num" in value: + system_host_disk_object.set_device_num(value["device_num"]) - if 'device_type' in value: - system_host_disk_object.set_device_type(value['device_type']) + if "device_type" in value: + system_host_disk_object.set_device_type(value["device_type"]) - if 'size_gib' in value: - system_host_disk_object.set_size_gib(float(value['size_gib'])) + if "size_gib" in value: + system_host_disk_object.set_size_gib(float(value["size_gib"])) + if "size_mib" in value: + system_host_disk_object.set_size_gib(float(value["size_mib"]) / 1024.0) - if 'available_gib' in value: - system_host_disk_object.set_available_gib(float(value['available_gib'])) + if "available_gib" in value: + system_host_disk_object.set_available_gib(float(value["available_gib"])) - if 'rpm' in value: - system_host_disk_object.set_rpm(value['rpm']) + if "available_mib" in value: + system_host_disk_object.set_available_gib(float(value["available_mib"]) / 1024.0) - if 'serial_id' in value: - system_host_disk_object.set_serial_id(value['serial_id']) + if "rpm" in value: + system_host_disk_object.set_rpm(value["rpm"]) - if 'device_path' in value: - system_host_disk_object.set_device_path(value['device_path']) + if "serial_id" in value: + system_host_disk_object.set_serial_id(value["serial_id"]) + + if "device_path" in value: + system_host_disk_object.set_device_path(value["device_path"]) self.system_host_disks.append(system_host_disk_object) def has_minimum_disk_space_in_gb(self, minimum_disk_space_in_gb: float) -> bool: - """ - This function will look for a disk in the list of disks with at least GB - of free space. + """Check if any disk has at least the specified amount of free space. - Returns: True if this host has at least one disk with at least GB of free space. + Args: + minimum_disk_space_in_gb (float): The minimum required free disk space in GB. + Returns: + bool: True if at least one disk has the minimum required free space, False otherwise. """ return any(item.get_available_gib() >= minimum_disk_space_in_gb for item in self.system_host_disks) diff --git a/keywords/cloud_platform/system/storage/objects/system_storage_backend_output.py b/keywords/cloud_platform/system/storage/objects/system_storage_backend_output.py index 08143c28..a76c2fa3 100644 --- a/keywords/cloud_platform/system/storage/objects/system_storage_backend_output.py +++ b/keywords/cloud_platform/system/storage/objects/system_storage_backend_output.py @@ -1,4 +1,6 @@ from framework.exceptions.keyword_exception import KeywordException +from framework.rest.rest_response import RestResponse +from keywords.cloud_platform.system.host.objects.storage_capabilities_object import StorageCapabilities from keywords.cloud_platform.system.storage.objects.system_storage_backend_object import SystemStorageBackendObject from keywords.cloud_platform.system.system_table_parser import SystemTableParser @@ -24,11 +26,19 @@ class SystemStorageBackendOutput: """ self.system_storage_backends: [SystemStorageBackendObject] = [] - system_table_parser = SystemTableParser(system_storage_backend_list_output) - output_values = system_table_parser.get_output_values_list() + + if isinstance(system_storage_backend_list_output, RestResponse): # came from REST and is already in dict format + json_object = system_storage_backend_list_output.get_json_content() + if "storage_backends" in json_object: + storage_backends = json_object["storage_backends"] + else: + storage_backends = [json_object] + else: + system_table_parser = SystemTableParser(system_storage_backend_list_output) + storage_backends = system_table_parser.get_output_values_list() system_storage_backend_object = None - for value in output_values: + for value in storage_backends: if "name" not in value: raise KeywordException(f"The output line {value} was not valid because it is missing an 'name'.") @@ -58,7 +68,18 @@ class SystemStorageBackendOutput: system_storage_backend_object.set_services(value["services"]) if "capabilities" in value: - system_storage_backend_object.add_capabilities(value["capabilities"]) + storage_capabilities = value["capabilities"] + # if it's from REST, then we are already in dict format + if isinstance(storage_capabilities, dict): + storage_capabilities_object = StorageCapabilities() + if "replication" in storage_capabilities: + storage_capabilities_object.set_replication(storage_capabilities["replication"]) + if "min_replication" in storage_capabilities: + storage_capabilities_object.set_min_replication(storage_capabilities["min_replication"]) + if "deployment_model" in storage_capabilities: + storage_capabilities_object.set_deployment_model(storage_capabilities["deployment_model"]) + else: + system_storage_backend_object.add_capabilities(storage_capabilities) self.system_storage_backends.append(system_storage_backend_object) diff --git a/scripts/lab_capability_scanner.py b/scripts/lab_capability_scanner.py index 9fe93e11..53eb415f 100644 --- a/scripts/lab_capability_scanner.py +++ b/scripts/lab_capability_scanner.py @@ -5,84 +5,42 @@ from optparse import OptionParser import json5 -from config.configuration_file_locations_manager import ( - ConfigurationFileLocationsManager, -) +from config.configuration_file_locations_manager import ConfigurationFileLocationsManager from config.configuration_manager import ConfigurationManager from config.lab.objects.lab_config import LabConfig from config.lab.objects.node import Node from framework.database.objects.lab_capability import LabCapability from framework.database.operations.capability_operation import CapabilityOperation -from framework.database.operations.lab_capability_operation import ( - LabCapabilityOperation, -) +from framework.database.operations.lab_capability_operation import LabCapabilityOperation from framework.database.operations.lab_operation import LabOperation from framework.logging.automation_logger import get_logger from framework.ssh.prompt_response import PromptResponse from framework.ssh.ssh_connection import SSHConnection from keywords.bmc.ipmitool.is_ipmitool_keywords import IsIPMIToolKeywords from keywords.bmc.ipmitool.sensor.ipmitool_sensor_keywords import IPMIToolSensorKeywords -from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import ( - DcManagerSubcloudListKeywords, -) -from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_availability_enum import ( - DcManagerSubcloudListAvailabilityEnum, -) -from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_management_enum import ( - DcManagerSubcloudListManagementEnum, -) -from keywords.cloud_platform.openstack.endpoint.openstack_endpoint_list_keywords import ( - OpenStackEndpointListKeywords, -) -from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_cpus_keywords import ( - GetHostsCpusKeywords, -) -from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import ( - GetHostsKeywords, -) -from keywords.cloud_platform.rest.bare_metal.memory.get_host_memory_keywords import ( - GetHostMemoryKeywords, -) -from keywords.cloud_platform.rest.bare_metal.ports.get_host_ports_keywords import ( - GetHostPortsKeywords, -) -from keywords.cloud_platform.rest.configuration.devices.system_host_device_keywords import ( - GetHostDevicesKeywords, -) -from keywords.cloud_platform.rest.configuration.interfaces.get_interfaces_keywords import ( - GetInterfacesKeywords, -) -from keywords.cloud_platform.rest.configuration.storage.get_storage_keywords import ( - GetStorageKeywords, -) +from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords +from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_availability_enum import DcManagerSubcloudListAvailabilityEnum +from keywords.cloud_platform.dcmanager.objects.dcmanger_subcloud_list_management_enum import DcManagerSubcloudListManagementEnum +from keywords.cloud_platform.rest.bare_metal.disks.get_host_disks_keywords import GetHostDisksKeywords +from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_cpus_keywords import GetHostsCpusKeywords +from keywords.cloud_platform.rest.bare_metal.hosts.get_hosts_keywords import GetHostsKeywords +from keywords.cloud_platform.rest.bare_metal.memory.get_host_memory_keywords import GetHostMemoryKeywords +from keywords.cloud_platform.rest.bare_metal.ports.get_host_ports_keywords import GetHostPortsKeywords +from keywords.cloud_platform.rest.configuration.addresses.get_host_addresses_keywords import GetHostAddressesKeywords +from keywords.cloud_platform.rest.configuration.devices.system_host_device_keywords import GetHostDevicesKeywords +from keywords.cloud_platform.rest.configuration.interfaces.get_interfaces_keywords import GetInterfacesKeywords +from keywords.cloud_platform.rest.configuration.storage.get_storage_backends_keyword import GetStorageBackendKeywords +from keywords.cloud_platform.rest.configuration.storage.get_storage_keywords import GetStorageKeywords +from keywords.cloud_platform.rest.configuration.system.get_system_keywords import GetSystemKeywords from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords -from keywords.cloud_platform.system.host.objects.system_host_if_output import ( - SystemHostInterfaceOutput, -) -from keywords.cloud_platform.system.host.objects.system_host_show_output import ( - SystemHostShowOutput, -) -from keywords.cloud_platform.system.host.system_host_disk_keywords import ( - SystemHostDiskKeywords, -) -from keywords.cloud_platform.system.host.system_host_list_keywords import ( - SystemHostListKeywords, -) -from keywords.cloud_platform.system.host.system_host_show_keywords import ( - SystemHostShowKeywords, -) -from keywords.cloud_platform.system.oam.objects.system_oam_show_output import ( - SystemOamShowOutput, -) -from keywords.cloud_platform.system.oam.system_oam_show_keywords import ( - SystemOamShowKeywords, -) +from keywords.cloud_platform.system.host.objects.system_host_if_output import SystemHostInterfaceOutput +from keywords.cloud_platform.system.oam.objects.system_oam_show_output import SystemOamShowOutput +from keywords.cloud_platform.system.oam.system_oam_show_keywords import SystemOamShowKeywords from testcases.conftest import log_configuration def find_capabilities(lab_config: LabConfig) -> list[str]: - """ - Finds the capabilities of the given lab. + """Find the capabilities of the given lab. Args: lab_config (LabConfig): The lab configuration object. @@ -93,10 +51,12 @@ def find_capabilities(lab_config: LabConfig) -> list[str]: lab_config.lab_capabilities = [] ssh_connection = LabConnectionKeywords().get_active_controller_ssh() - endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list() - lab_config.set_horizon_url(endpoint_output.get_horizon_url()) + lab_config.set_horizon_url(get_horizon_url()) - is_dc_system = endpoint_output.is_endpoint("dcmanager") + # Check if this is a DC system by checking distributed_cloud_role + system_output = GetSystemKeywords().get_system() + system_object = system_output.get_system_object() + is_dc_system = system_object.get_distributed_cloud_role() == "systemcontroller" if is_dc_system: subclouds = retrieve_subclouds(lab_config, ssh_connection) lab_config.set_subclouds(subclouds[:]) @@ -113,8 +73,7 @@ def find_capabilities(lab_config: LabConfig) -> list[str]: def find_subclouds_capabilities(lab_config: LabConfig) -> list[str]: - """ - Finds the capabilities of the subclouds from the given lab. + """Find the capabilities of the subclouds from the given lab. Args: lab_config (LabConfig): The lab configuration object. @@ -149,8 +108,7 @@ def find_subclouds_capabilities(lab_config: LabConfig) -> list[str]: def get_subcloud_name_from_path(subcloud_config_file_path: str) -> str: - """ - Returns the name of the cloud from a subcloud's config file path. + """Extract the subcloud name from a config file path. Args: subcloud_config_file_path (str): The subcloud config file path. @@ -164,19 +122,15 @@ def get_subcloud_name_from_path(subcloud_config_file_path: str) -> str: def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, subcloud_config_file_path: str) -> None: - """ - Creates a new config file for the related subcloud in the given path. + """Create a new config file for the subcloud if it doesn't exist. Args: host (LabConfig): The host lab configuration. subcloud_name (str): The name of the subcloud. subcloud_config_file_path (str): The subcloud's config file path. - Returns: - None: - Note: - The initial content of this created file is the main part of the host config file, but with the IP empty. + The initial content is based on the host config but with empty IP. """ if os.path.isfile(subcloud_config_file_path): return @@ -194,8 +148,7 @@ def create_subcloud_config_file_if_needed(host: LabConfig, subcloud_name: str, s def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool: - """ - Returns True if SR-IOV is enabled on the given node. + """Check if SR-IOV is enabled on the given node. Args: host_interface_list_output (SystemHostInterfaceOutput): Output of the system host interface list command. @@ -209,41 +162,11 @@ def is_sriov(host_interface_list_output: SystemHostInterfaceOutput) -> bool: return False -def has_min_space_30G(ssh_connection: SSHConnection, node: Node) -> bool: - """ - Returns true if the node has at least 30 GB of free space in one of its disks. - - Args: - ssh_connection (SSHConnection): The SSH connection to the node. - node (Node): the node - - Returns: - bool: True if the node has at least 30 GB of free space in one of its disks,, False otherwise. - """ - host_disk_output = SystemHostDiskKeywords(ssh_connection).get_system_host_disk_list(node.get_name()) - return host_disk_output.has_minimum_disk_space_in_gb(30) - - -def get_host_show_output(ssh_connection: SSHConnection, node: Node) -> SystemHostShowOutput: - """ - Returns an object of SystemHostShowOutput. This object represents the output of the 'system host-show' command. - - Args: - ssh_connection (SSHConnection): The SSH connection to execute the command. - node (Node): The node whose details are being retrieved. - - Returns: - SystemHostShowOutput: An object representing the output of the 'system host-show' command. - """ - return SystemHostShowKeywords(ssh_connection).get_system_host_show_output(node.get_name()) - - def has_host_bmc_sensor(ssh_connection: SSHConnection) -> bool: - """ - Returns True if the node has BMC sensors. + """Check if the node has BMC sensors. Args: - ssh_connection (SSHConnection): The SSH connection. + ssh_connection (SSHConnection): The SSH connection to the host. Returns: bool: True if the node has BMC sensors, False otherwise. @@ -259,18 +182,16 @@ def has_host_bmc_sensor(ssh_connection: SSHConnection) -> bool: def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[LabConfig]: - """ - Gets the list of subclouds on this lab. + """Get the list of online and managed subclouds. - Subclouds whose 'availability' is different from 'online' and 'management' is different from 'managed' are not - considered. + Only subclouds with 'availability' = 'online' and 'management' = 'managed' are considered. Args: lab_config (LabConfig): The lab config object. ssh_connection (SSHConnection): Connection to the active controller of the central cloud. Returns: - list[LabConfig]: the list of LabConfig objects matching the online and available subclouds of this lab. + list[LabConfig]: List of LabConfig objects for online and managed subclouds. """ subclouds: [LabConfig] = [] @@ -305,15 +226,14 @@ def retrieve_subclouds(lab_config: LabConfig, ssh_connection: SSHConnection) -> def get_subcloud_ip(subcloud_name: str, central_cloud_ssh_connection: SSHConnection) -> str: - """ - Gets the external IP associated with the 'subcloud_name'. + """Get the external IP address of a subcloud. Args: - subcloud_name (str): The name of the cloud from which one wants to obtain the IP. - central_cloud_ssh_connection (SSHConnection): The SSH connection to a central cloud. + subcloud_name (str): The name of the subcloud. + central_cloud_ssh_connection (SSHConnection): SSH connection to the central cloud. Returns: - str: subcloud's IP. + str: The subcloud's IP address. """ # Executes the command 'system oam-show' on the subcloud to get the subcloud's IP. password_prompt = PromptResponse("password:", ConfigurationManager.get_lab_config().get_admin_credentials().get_password()) @@ -363,12 +283,13 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod Raises: RuntimeError: If no controller node is found in the lab. """ - hosts = SystemHostListKeywords(ssh_connection).get_system_host_with_extra_column(["subfunctions", "bm_ip", "bm_username"]) + host_show_output = GetHostsKeywords().get_hosts() + hosts = host_show_output.get_all_system_host_show_objects() nodes = [] # Count the controllers to decide if the lab is Simplex or not. controllers_count = 0 - for host in hosts.get_hosts(): + for host in hosts: if host.get_personality() == "controller": controllers_count += 1 @@ -380,22 +301,22 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod raise RuntimeError("Failed to find at least one controller on this lab.") # Look at the Capabilities of each host individually. - for host in hosts.get_hosts(): + for host in hosts: - name = host.get_host_name() + name = host.get_hostname() node_dict = { - "ip": get_controller_ip(ssh_connection, name), + "ip": get_controller_ip(host_show_output, name), "node_type": host.get_personality(), "node_capabilities": [], } - node = Node(host.get_host_name(), node_dict) + node = Node(host.get_hostname(), node_dict) - node.set_subfunctions(host.get_sub_functions()) + node.set_subfunctions(host.get_subfunctions()) node.set_bm_username(host.get_bm_username()) node.set_bm_ip(host.get_bm_ip()) # Gather data from the system into objects. - host_show_output = GetHostsKeywords().get_hosts() + host_uuid = host_show_output.get_system_host_show_object(node.get_name()).get_uuid() host_cpu_output = GetHostsCpusKeywords().get_hosts_cpus(host_uuid) @@ -404,6 +325,7 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod host_port_output = GetHostPortsKeywords().get_ports(host_uuid) host_memory_output = GetHostMemoryKeywords().get_memory(host_uuid) host_storage_output = GetStorageKeywords().get_storage(host_uuid) + host_disk_output = GetHostDisksKeywords().get_disks(host_uuid) # Parse the data to define the lab's capabilities. if is_sriov(host_interface_list_output): @@ -463,7 +385,7 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod node.append_node_capability("lab_has_columbiaville") lab_config.add_lab_capability("lab_has_columbiaville") - if has_min_space_30G(ssh_connection, node): + if host_disk_output.has_minimum_disk_space_in_gb(30): node.append_node_capability("lab_has_min_space_30G") lab_config.add_lab_capability("lab_has_min_space_30G") @@ -511,47 +433,48 @@ def scan_hosts(lab_config: LabConfig, ssh_connection: SSHConnection) -> list[Nod return nodes -def get_controller_ip(ssh_connection: SSHConnection, controller_name: str) -> str | None: - """ - Getter for controller ip. +def get_controller_ip(host_show_output: object, controller_name: str) -> str | None: + """Get the IP address of a controller. Args: - ssh_connection (SSHConnection): The SSH connection to the lab. + host_show_output (object): The host show output object. controller_name (str): The name of the controller. Returns: str | None: The IP address of the controller, or None if not found. """ - system_oam_output = SystemOamShowKeywords(ssh_connection).oam_show() + host_uuid = host_show_output.get_system_host_show_object(controller_name).get_uuid() - if controller_name == "controller-0": - return system_oam_output.get_oam_c0_ip() - elif controller_name == "controller-1": - return system_oam_output.get_oam_c1_ip() - else: - get_logger().log_error(f"GET IP for {controller_name} has not been implemented") + if host_uuid is None: + get_logger().log_error(f"Host {controller_name} not found") + return None - return None + host_addresses_output = GetHostAddressesKeywords().get_host_addresses(host_uuid) + return host_addresses_output.get_address_by_ifname("oam0") def get_horizon_url() -> str: - """ - Getter for Horizon URL. + """Get the Horizon URL using system CLI commands. Returns: str: The formatted Horizon URL. """ + # Get system capabilities to check if HTTPS is enabled + system_output = GetSystemKeywords().get_system() + system_capabilities = system_output.get_system_object().get_capabilities() + https_enabled = system_capabilities.get_https_enabled() if system_capabilities else False + + # Get OAM IP information using system CLI ssh_connection = LabConnectionKeywords().get_active_controller_ssh() - endpoint_output = OpenStackEndpointListKeywords(ssh_connection).endpoint_list() + oam_show_output = SystemOamShowKeywords(ssh_connection).oam_show() - # Remove port from orignal url and then add 8443 for https or 8080 for http - url = endpoint_output.get_endpoint("keystone", "public").get_url().rsplit(":", 1)[0] - if "https" in url: - url += ":8443/" - else: - url += ":8080/" + # Get the appropriate IP (oam_ip for vbox, oam_floating_ip for physical) + oam_ip = oam_show_output.get_oam_ip() or oam_show_output.get_oam_floating_ip() - return url + # Construct Horizon URL + protocol = "https" if https_enabled else "http" + port = "8443" if https_enabled else "8080" + return f"{protocol}://{oam_ip}:{port}/" def get_lab_type(lab_config: LabConfig) -> str: @@ -587,6 +510,32 @@ def get_lab_type(lab_config: LabConfig) -> str: return "Duplex" +def is_aio(lab_config: LabConfig) -> bool: + """Checks if the lab is AIO. + + Args: + lab_config (LabConfig): The lab configuration object. + + Returns: + bool: True if the lab is AIO, False otherwise. + """ + nodes = lab_config.get_nodes() + worker_nodes = list(filter(lambda node: node.get_type() == "worker", nodes)) + + return len(worker_nodes) == 0 + + +def is_rook_ceph() -> bool: + """ + Checks if the lab is using Rook Ceph. + + Returns: + bool: True if the lab is using Rook Ceph, False otherwise. + """ + backends = GetStorageBackendKeywords().get_storage_backends() + return backends.is_backend_configured("ceph-rook") + + def write_config(lab_config: LabConfig) -> None: """ Writes the new config out to the current config @@ -764,6 +713,14 @@ if __name__ == "__main__": lab_type = get_lab_type(lab_config) lab_config.set_lab_type(lab_type) + # check if the lab is an aio + if is_aio(lab_config): + lab_config.add_lab_capability("lab_is_aio") + + # check if the lab is using rook ceph + if is_rook_ceph(): + lab_config.add_lab_capability("lab_has_rook_ceph") + if ConfigurationManager.get_database_config().use_database(): # insert lab into db if it doesn't already exist lab_name = lab_config.get_lab_name()