diff --git a/keywords/k8s/certificate/kubectl_get_certificate_keywords.py b/keywords/k8s/certificate/kubectl_get_certificate_keywords.py new file mode 100644 index 00000000..2c2aa0c6 --- /dev/null +++ b/keywords/k8s/certificate/kubectl_get_certificate_keywords.py @@ -0,0 +1,60 @@ +from framework.ssh.ssh_connection import SSHConnection +from framework.validation.validation import validate_equals_with_retry +from keywords.base_keyword import BaseKeyword +from keywords.k8s.certificate.object.kubectl_get_certificate_output import KubectlGetCertsOutput +from keywords.k8s.k8s_command_wrapper import export_k8s_config + + +class KubectlGetCertStatusKeywords(BaseKeyword): + """ + Class for 'kubectl get certificate' keywords + """ + + def __init__(self, ssh_connection: SSHConnection): + """ + Constructor + + Args: + ssh_connection (SSHConnection): SSH connection object used to interact with the Kubernetes cluster. + """ + self.ssh_connection = ssh_connection + + def get_certificates(self, namespace: str = None) -> KubectlGetCertsOutput: + """ + Gets the k8s certificate that are available using 'kubectl get certificate'. + + Args: + namespace (str, optional): The namespace to retrieve certificates from. Defaults to None. + + Returns: + KubectlGetCertsOutput: Parsed output of the 'kubectl get certificate' command. + + """ + arg_namespace = "" + if namespace: + arg_namespace = f"-n {namespace}" + + kubectl_get_issuer_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} get certificate")) + self.validate_success_return_code(self.ssh_connection) + + cert_list_output = KubectlGetCertsOutput(kubectl_get_issuer_output) + + return cert_list_output + + def wait_for_certs_status(self, certs_name: str, is_ready: bool, namespace: str = None, timeout: int = 600) -> None: + """ + Waits timeout amount of time for the given certs to be in the given status + + Args: + certs_name (str): the name of the certificate + is_ready (bool): the is_ready status + namespace (str): the namespace + timeout (int, optional): the timeout in secs + + """ + + def get_cert_status(): + cert_status = self.get_certificates(namespace).get_cert(certs_name).get_ready() + return bool(cert_status) + + validate_equals_with_retry(get_cert_status, is_ready, "Verify the certs status issued", timeout=600) diff --git a/keywords/k8s/certificate/kubectl_get_issuer_keywords.py b/keywords/k8s/certificate/kubectl_get_issuer_keywords.py new file mode 100644 index 00000000..f0e12eef --- /dev/null +++ b/keywords/k8s/certificate/kubectl_get_issuer_keywords.py @@ -0,0 +1,60 @@ +from framework.ssh.ssh_connection import SSHConnection +from framework.validation.validation import validate_equals_with_retry +from keywords.base_keyword import BaseKeyword +from keywords.k8s.certificate.object.kubectl_get_issuer_output import KubectlGetIssuerOutput +from keywords.k8s.k8s_command_wrapper import export_k8s_config + + +class KubectlGetCertIssuerKeywords(BaseKeyword): + """ + Class for 'kubectl get issuer' keywords + """ + + def __init__(self, ssh_connection: SSHConnection): + """ + Constructor + + Args: + ssh_connection (SSHConnection): SSH connection object used to interact with the Kubernetes cluster. + """ + self.ssh_connection = ssh_connection + + def get_issuers(self, namespace: str = None) -> KubectlGetIssuerOutput: + """ + Gets the k8s issuer that are available using 'kubectl get issuer'. + + Args: + namespace (str, optional): the namespace + + Returns: + KubectlGetIssuerOutput: Parsed output of the 'kubectl get issuer' command. + """ + arg_namespace = "" + if namespace: + arg_namespace = f"-n {namespace}" + + kubectl_get_issuer_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} get issuer")) + self.validate_success_return_code(self.ssh_connection) + + issuer_list_output = KubectlGetIssuerOutput(kubectl_get_issuer_output) + + return issuer_list_output + + def wait_for_issuer_status(self, issuer_name: str, is_ready: bool, namespace: str = None, timeout: int = 600) -> None: + """ + Waits timeout amount of time for the given issuer to be in the given status + + Args: + issuer_name (str): the certs issuer name + is_ready (bool): the is_ready status + namespace (str , optional): the namespace + timeout (int): the timeout in secs + + """ + + def get_issuer_status(): + issuer_status = self.get_issuers(namespace).get_issuer(issuer_name).get_ready() + return bool(issuer_status) + + message = f"issuer {issuer_name}'s status is {is_ready}" + validate_equals_with_retry(get_issuer_status, is_ready, message, timeout=600) diff --git a/keywords/k8s/certificate/object/kubectl_cert_object.py b/keywords/k8s/certificate/object/kubectl_cert_object.py new file mode 100644 index 00000000..b8d048e2 --- /dev/null +++ b/keywords/k8s/certificate/object/kubectl_cert_object.py @@ -0,0 +1,78 @@ +class KubectlCertObject: + """ + Class to hold attributes of a 'kubectl get certificate' certificate entry. + """ + + def __init__(self, name: str): + """ + Constructor + + Args: + name (str): Name of the certs. + """ + self.name = name + self.ready = None + self.age = None + + def get_name(self) -> str: + """ + Getter for NAME entry. + + Returns: + str: The name of the certs. + """ + return self.name + + def set_secret(self, secret: str) -> None: + """ + Setter for SECRET + + Args: + secret (str): The secret associated with the certs. + + Returns: None + """ + self.secret = secret + + def get_secret(self) -> str: + """ + Getter for SECRET entry + """ + return self.secret + + def set_ready(self, ready: str) -> None: + """ + Setter for READY + + Args: + ready (str): The ready associated with the certs. + + Returns: None + """ + self.ready = ready + + def get_ready(self) -> str: + """ + Getter for READY entry + """ + return self.ready + + def set_age(self, age: str) -> None: + """ + Setter for AGE. + + Args: + age (str): The age associated with the certs. + + Returns: None + """ + self.age = age + + def get_age(self) -> str: + """ + Getter for AGE entry. + + Returns: + str: The age of the certs. + """ + return self.age diff --git a/keywords/k8s/certificate/object/kubectl_get_certificate_output.py b/keywords/k8s/certificate/object/kubectl_get_certificate_output.py new file mode 100644 index 00000000..4824f104 --- /dev/null +++ b/keywords/k8s/certificate/object/kubectl_get_certificate_output.py @@ -0,0 +1,60 @@ +from starlingx.keywords.k8s.certificate.object.kubectl_cert_object import KubectlCertObject +from starlingx.keywords.k8s.certificate.object.kubectl_get_certificate_table_parser import KubectlGetCertsTableParser + + +class KubectlGetCertsOutput: + """ + This class represents the output of the 'kubectl get certificate' command. + + It parses the raw command output and provides access to certificate objects. + """ + + def __init__(self, kubectl_get_certs_output: str): + """ + Constructor + + Args: + kubectl_get_certs_output (str): Raw string output from running a "kubectl get certificate" command. + + """ + self.kubectl_certs: [KubectlCertObject] = [] + kubectl_get_certs_table_parser = KubectlGetCertsTableParser(kubectl_get_certs_output) + output_values_list = kubectl_get_certs_table_parser.get_output_values_list() + + for pod_dict in output_values_list: + + if "NAME" not in pod_dict: + raise ValueError(f"There is no NAME associated with the pod: {pod_dict}") + + certs = KubectlCertObject(pod_dict["NAME"]) + + if "READY" in pod_dict: + certs.set_ready(pod_dict["READY"]) + + if "SECRET" in pod_dict: + certs.set_secret(pod_dict["SECRET"]) + + if "AGE" in pod_dict: + certs.set_age(pod_dict["AGE"]) + + self.kubectl_certs.append(certs) + + def get_cert(self, certs_name: str) -> KubectlCertObject: + """ + This function will get the pod with the name specified from this get_pods_output. + + Args: + certs_name (str): The name of the certs. + + Returns: + KubectlCertObject: The certificate object with the specified name. + + Raises: + ValueError: If no certificate with the specified name is found. + + """ + for cert in self.kubectl_certs: + if cert.get_name() == certs_name: + return cert + else: + raise ValueError(f"There is no certs with the name {certs_name}.") diff --git a/keywords/k8s/certificate/object/kubectl_get_certificate_table_parser.py b/keywords/k8s/certificate/object/kubectl_get_certificate_table_parser.py new file mode 100644 index 00000000..f4eea1a7 --- /dev/null +++ b/keywords/k8s/certificate/object/kubectl_get_certificate_table_parser.py @@ -0,0 +1,22 @@ +from keywords.k8s.k8s_table_parser_base import K8sTableParserBase + + +class KubectlGetCertsTableParser(K8sTableParserBase): + """ + Class for parsing the output of "kubectl get certificate" commands. + """ + + def __init__(self, k8s_output: str): + """ + Constructor + + Args: + k8s_output (str): The raw String output of a kubernetes command that returns a table. + """ + super().__init__(k8s_output) + self.possible_headers = [ + "NAME", + "READY", + "SECRET", + "AGE", + ] diff --git a/keywords/k8s/certificate/object/kubectl_get_issuer_output.py b/keywords/k8s/certificate/object/kubectl_get_issuer_output.py new file mode 100644 index 00000000..da4b96ef --- /dev/null +++ b/keywords/k8s/certificate/object/kubectl_get_issuer_output.py @@ -0,0 +1,57 @@ +from starlingx.keywords.k8s.certificate.object.kubectl_get_issuer_table_parser import KubectlGetIssuerTableParser +from starlingx.keywords.k8s.certificate.object.kubectl_issuer_object import KubectlIssuerObject + + +class KubectlGetIssuerOutput: + """ + This class represents the output of the 'kubectl get issuer' command. + + It provides methods to parse and retrieve issuer information from the command output. + """ + + def __init__(self, kubectl_get_issuer_output: str): + """ + Constructor + + Args: + kubectl_get_issuer_output (str): Raw string output from running a "kubectl get issuer" command. + + """ + self.kubectl_issuer: [KubectlIssuerObject] = [] + kubectl_get_issuer_table_parser = KubectlGetIssuerTableParser(kubectl_get_issuer_output) + output_values_list = kubectl_get_issuer_table_parser.get_output_values_list() + + for pod_dict in output_values_list: + + if "NAME" not in pod_dict: + raise ValueError(f"There is no NAME associated with the issuer: {pod_dict}") + + issuer = KubectlIssuerObject(pod_dict["NAME"]) + + if "READY" in pod_dict: + issuer.set_ready(pod_dict["READY"]) + + if "AGE" in pod_dict: + issuer.set_age(pod_dict["AGE"]) + + self.kubectl_issuer.append(issuer) + + def get_issuer(self, issuer_name: str) -> KubectlIssuerObject: + """ + This function will get the pod with the name specified from this get_issuer_output. + + Args: + issuer_name (str): The name of the issuer. + + Returns: + KubectlIssuerObject: The issuer object with the specified name. + + Raises: + ValueError: If no issuer with the specified name is found. + + """ + for issuer in self.kubectl_issuer: + if issuer.get_name() == issuer_name: + return issuer + else: + raise ValueError(f"There is no issuer with the name {issuer_name}.") diff --git a/keywords/k8s/certificate/object/kubectl_get_issuer_table_parser.py b/keywords/k8s/certificate/object/kubectl_get_issuer_table_parser.py new file mode 100644 index 00000000..aa3fd6c9 --- /dev/null +++ b/keywords/k8s/certificate/object/kubectl_get_issuer_table_parser.py @@ -0,0 +1,21 @@ +from keywords.k8s.k8s_table_parser_base import K8sTableParserBase + + +class KubectlGetIssuerTableParser(K8sTableParserBase): + """ + Class for parsing the output of "kubectl get issuer" commands. + """ + + def __init__(self, k8s_output: str): + """ + Constructor + + Args: + k8s_output (str): The raw String output of a kubernetes command that returns a table. + """ + super().__init__(k8s_output) + self.possible_headers = [ + "NAME", + "READY", + "AGE", + ] diff --git a/keywords/k8s/certificate/object/kubectl_issuer_object.py b/keywords/k8s/certificate/object/kubectl_issuer_object.py new file mode 100644 index 00000000..7e22eb43 --- /dev/null +++ b/keywords/k8s/certificate/object/kubectl_issuer_object.py @@ -0,0 +1,58 @@ +class KubectlIssuerObject: + """ + Class to hold attributes of a 'kubectl get issuer' pod entry. + """ + + def __init__(self, name: str): + """ + Constructor + + Args: + name (str): Name of the pod. + """ + self.name = name + self.ready = None + self.age = None + + def get_name(self) -> str: + """ + Getter for NAME entry + + """ + return self.name + + def set_ready(self, ready: str) -> None: + """ + Setter for READY + + Args: + ready (str): The ready associated with the issuer. + + Returns: None + + """ + self.ready = ready + + def get_ready(self) -> str: + """ + Getter for READY entry + """ + return self.ready + + def set_age(self, age: str) -> None: + """ + Setter for AGE + + Args: + age (str): The age associated with the issuer. + + Returns: None + + """ + self.age = age + + def get_age(self) -> str: + """ + Getter for AGE entry + """ + return self.age diff --git a/keywords/k8s/pods/kubectl_get_pods_keywords.py b/keywords/k8s/pods/kubectl_get_pods_keywords.py index c51a273a..ff6b55dc 100644 --- a/keywords/k8s/pods/kubectl_get_pods_keywords.py +++ b/keywords/k8s/pods/kubectl_get_pods_keywords.py @@ -1,5 +1,6 @@ import time +from framework.ssh.ssh_connection import SSHConnection from keywords.base_keyword import BaseKeyword from keywords.k8s.k8s_command_wrapper import export_k8s_config from keywords.k8s.pods.object.kubectl_get_pods_output import KubectlGetPodsOutput @@ -10,24 +11,26 @@ class KubectlGetPodsKeywords(BaseKeyword): Class for 'kubectl get pods' keywords """ - def __init__(self, ssh_connection): + def __init__(self, ssh_connection: SSHConnection): """ - Constructor + Initialize the KubectlGetPodsKeywords class. + Args: - ssh_connection: + ssh_connection (SSHConnection): An SSH connection object to the target system. """ self.ssh_connection = ssh_connection def get_pods(self, namespace: str = None) -> KubectlGetPodsOutput: """ Gets the k8s pods that are available using '-o wide'. + Args: - namespace (): + namespace(str, optional): The namespace to search for pods. If None, it will search in all namespaces. Returns: + KubectlGetPodsOutput: An object containing the parsed output of the command. """ - arg_namespace = "" if namespace: arg_namespace = f"-n {namespace}" @@ -40,14 +43,11 @@ class KubectlGetPodsKeywords(BaseKeyword): def get_pods_all_namespaces(self) -> KubectlGetPodsOutput: """ - Gets the k8s pods that are available using '-o wide'. - Args: - namespace (): + Gets the k8s pods that are available using '-o wide' for all namespaces. Returns: - + KubectlGetPodsOutput: An object containing the parsed output of the command. """ - kubectl_get_pods_output = self.ssh_connection.send(export_k8s_config("kubectl -o wide get pods --all-namespaces")) self.validate_success_return_code(self.ssh_connection) pods_list_output = KubectlGetPodsOutput(kubectl_get_pods_output) @@ -57,16 +57,17 @@ class KubectlGetPodsKeywords(BaseKeyword): def wait_for_pod_status(self, pod_name: str, expected_status: str, namespace: str = None, timeout: int = 600) -> bool: """ Waits timeout amount of time for the given pod to be in the given status + Args: - pod_name (): the pod name - expected_status (): the expected status - namespace (): the namespace - timeout (): the timeout in secs + pod_name (str): the pod name + expected_status (str): the expected status + namespace (str): the namespace + timeout (int): the timeout in secs Returns: + bool: True if the pod is in the expected status """ - pod_status_timeout = time.time() + timeout while time.time() < pod_status_timeout: @@ -80,11 +81,13 @@ class KubectlGetPodsKeywords(BaseKeyword): def wait_for_all_pods_status(self, expected_statuses: [str], timeout: int = 600) -> bool: """ Wait for all pods to be in the given status(s) - Args: - expected_statuses (): list of expected statuses ex. ['Completed' , 'Running'] - timeout (): the amount of time in seconds to wait - Returns: True if all expected statuses are met + Args: + expected_statuses ([str]): list of expected statuses ex. ['Completed' , 'Running'] + timeout (int): the amount of time in seconds to wait + + Returns: + bool: True if all expected statuses are met """ pod_status_timeout = time.time() + timeout diff --git a/keywords/k8s/pods/object/kubectl_get_pods_output.py b/keywords/k8s/pods/object/kubectl_get_pods_output.py index 7d074654..8c230c21 100644 --- a/keywords/k8s/pods/object/kubectl_get_pods_output.py +++ b/keywords/k8s/pods/object/kubectl_get_pods_output.py @@ -3,64 +3,72 @@ from keywords.k8s.pods.object.kubectl_pod_object import KubectlPodObject class KubectlGetPodsOutput: + """ + A class to interact with and retrieve information about Kubernetes pods. + + This class provides methods to filter and retrieve pod information + using the `kubectl` command output. + """ def __init__(self, kubectl_get_pods_output: str): """ Constructor Args: - kubectl_get_pods_output: Raw string output from running a "kubectl get pods" command. - + kubectl_get_pods_output (str): Raw string output from running a "kubectl get pods" command. """ - self.kubectl_pod: [KubectlPodObject] = [] kubectl_get_pods_table_parser = KubectlGetPodsTableParser(kubectl_get_pods_output) output_values_list = kubectl_get_pods_table_parser.get_output_values_list() for pod_dict in output_values_list: - if 'NAME' not in pod_dict: + if "NAME" not in pod_dict: raise ValueError(f"There is no NAME associated with the pod: {pod_dict}") - pod = KubectlPodObject(pod_dict['NAME']) + pod = KubectlPodObject(pod_dict["NAME"]) - if 'NAMESPACE' in pod_dict: - pod.set_namespace(pod_dict['NAMESPACE']) + if "NAMESPACE" in pod_dict: + pod.set_namespace(pod_dict["NAMESPACE"]) - if 'READY' in pod_dict: - pod.set_ready(pod_dict['READY']) + if "READY" in pod_dict: + pod.set_ready(pod_dict["READY"]) - if 'STATUS' in pod_dict: - pod.set_status(pod_dict['STATUS']) + if "STATUS" in pod_dict: + pod.set_status(pod_dict["STATUS"]) - if 'RESTARTS' in pod_dict: - pod.set_restarts(pod_dict['RESTARTS']) + if "RESTARTS" in pod_dict: + pod.set_restarts(pod_dict["RESTARTS"]) - if 'AGE' in pod_dict: - pod.set_age(pod_dict['AGE']) + if "AGE" in pod_dict: + pod.set_age(pod_dict["AGE"]) - if 'IP' in pod_dict: - pod.set_ip(pod_dict['IP']) + if "IP" in pod_dict: + pod.set_ip(pod_dict["IP"]) - if 'NODE' in pod_dict: - pod.set_node(pod_dict['NODE']) + if "NODE" in pod_dict: + pod.set_node(pod_dict["NODE"]) - if 'NOMINATED NODE' in pod_dict: - pod.set_nominated_node(pod_dict['NOMINATED NODE']) + if "NOMINATED NODE" in pod_dict: + pod.set_nominated_node(pod_dict["NOMINATED NODE"]) - if 'READINESS GATES' in pod_dict: - pod.set_readiness_gates(pod_dict['READINESS GATES']) + if "READINESS GATES" in pod_dict: + pod.set_readiness_gates(pod_dict["READINESS GATES"]) self.kubectl_pod.append(pod) - def get_pod(self, pod_name) -> KubectlPodObject: + def get_pod(self, pod_name: str) -> KubectlPodObject: """ This function will get the pod with the name specified from this get_pods_output. + Args: - pod_name: The name of the pod of interest. + pod_name (str): The name of the pod of interest. - Returns: KubectlPodObject + Returns: + KubectlPodObject: The pod object with the name specified. + Raises: + ValueError: If the pod with the specified name does not exist in the output. """ for pod in self.kubectl_pod: if pod.get_name() == pod_name: @@ -68,22 +76,44 @@ class KubectlGetPodsOutput: else: raise ValueError(f"There is no pod with the name {pod_name}.") - def get_pods_start_with(self, starts_with) -> [KubectlPodObject]: + def get_pods_start_with(self, starts_with: str) -> [KubectlPodObject]: """ Returns list of pods that starts with 'starts_with' + Args: - starts_with - the str the pod name starts with + starts_with (str): the str the pod name starts with + Returns: + [KubectlPodObject]: list of pods that starts with 'starts_with' """ - pods = list(filter(lambda pod: starts_with in pod.get_name(), self.kubectl_pod)) return pods def get_pods(self) -> [KubectlPodObject]: """ - Gets all pods + Gets all pods. + Returns: + [KubectlPodObject]: A list of all pods. """ return self.kubectl_pod + + def get_unique_pod_matching_prefix(self, starts_with: str) -> str: + """ + Get the full name(s) of pod(s) that start with the given prefix. + + Args: + starts_with(str): The prefix of the pod name. + + Returns: + str: A string if one pod matches + + Raises: + ValueError: If no pods match the prefix. + """ + pods = self.get_pods_start_with(starts_with) + if len(pods) == 0: + raise ValueError(f"No pods found starting with '{starts_with}'.") + return pods[0].get_name() diff --git a/keywords/network/ip_address_keywords.py b/keywords/network/ip_address_keywords.py index 49b59269..bf8b6f45 100644 --- a/keywords/network/ip_address_keywords.py +++ b/keywords/network/ip_address_keywords.py @@ -1,22 +1,26 @@ import ipaddress +import socket from typing import Optional +from config.configuration_manager import ConfigurationManager +from framework.logging.automation_logger import get_logger from keywords.base_keyword import BaseKeyword class IPAddressKeywords(BaseKeyword): """ This class contains all the keywords related to the Internet Protocol (IP) addresses in a general way, that is it, - independently of specific implemented technologies. + + independently of specific implemented technologies. """ def __init__(self, ip: str): """ Constructor + Args: ip (str): a valid IPv4 or IPv6 address representation. """ - self.ip = None if not self.is_valid_ip_address(ip): @@ -27,11 +31,13 @@ class IPAddressKeywords(BaseKeyword): def ipv6_same_network(self, ipv6: str, prefix_length: int) -> bool: """ Verifies if 'ipv6' and 'self.ip' addresses belong to the same network. + Args: ipv6 (str): An IPv6 address as a string. prefix_length (int): The prefix length (network mask). - Returns: True if 'self.ip' and 'ipv6' are in the same network, False otherwise. + Returns: + bool: True if 'self.ip' and 'ipv6' are in the same network, False otherwise. """ if not self.is_valid_ip_address(ipv6): @@ -46,11 +52,13 @@ class IPAddressKeywords(BaseKeyword): def ipv4_same_network(self, ipv4: str, netmask: str) -> bool: """ Verifies if 'ipv4' and 'self.ip' addresses belong to the same network. + Args: ipv4 (str): An IPv6 address as a string. - netmask: The subnet mask as a string. + netmask (str): The subnet mask as a string. - Returns: True if 'self.ip' and 'ipv4' are in the same network, False otherwise. + Returns: + bool: True if 'self.ip' and 'ipv4' are in the same network, False otherwise. """ if not self.is_valid_ip_address(ipv4): @@ -67,11 +75,13 @@ class IPAddressKeywords(BaseKeyword): def ipv4_with_prefix_length_same_network(self, ipv4: str, prefix_length: int) -> bool: """ Verifies if 'ipv4' and 'self.ip' addresses belong to the same network. + Args: ipv4 (str): An IPv6 address as a string. prefix_length (int): The number of leading bits that corresponds to the network prefix in the IP address. - Returns: True if 'self.ip' and 'ipv4' are in the same network; False otherwise. + Returns: + bool: True if 'self.ip' and 'ipv4' are in the same network; False otherwise. """ if not self.is_valid_ip_address(ipv4): @@ -85,11 +95,13 @@ class IPAddressKeywords(BaseKeyword): def ip_same_network(self, ip: str, prefix_length: int) -> bool: """ Verifies if 'ipv6' and 'self.ip' addresses belong to the same network. + Args: ip (str): An IPv4 or IPv6 address as a string. prefix_length (int): The prefix length. - Returns: True if 'self.ip' and 'ip' are in the same network; False otherwise. + Returns: + bool: True if 'self.ip' and 'ip' are in the same network; False otherwise. """ if not self.is_valid_ip_address(ip): @@ -106,14 +118,15 @@ class IPAddressKeywords(BaseKeyword): else: return False - def is_valid_ip_address(self, ip_address) -> bool: + def is_valid_ip_address(self, ip_address: str) -> bool: """ Check if 'ip_address' is a valid IPv4 or IPv6 address. + Args: - ip_address: a supposed valid either IPv4 or IPv6 address. + ip_address (str): a supposed valid either IPv4 or IPv6 address. Returns: - boolean: True if 'ip_address' is valid IP; False otherwise. + bool: True if 'ip_address' is valid IP; False otherwise. """ try: ipaddress.ip_address(ip_address) @@ -124,10 +137,12 @@ class IPAddressKeywords(BaseKeyword): def check_ip_version(self, ip: str) -> Optional[str]: """ Check if 'ip' is either an IPv4 or IPv6 address. + + Args: + ip (str): a supposed valid either IPv4 or IPv6 address. + Returns: - str: "IPv4" if 'ip' is an IPv4; - "IPv6" if "ip" is an IPv6; - None, otherwise. + Optional[str]: 'IPv4' if the address is IPv4, 'IPv6' if the address is IPv6, or None if invalid. """ try: ip_obj = ipaddress.ip_address(ip) @@ -138,3 +153,25 @@ class IPAddressKeywords(BaseKeyword): return "IPv4" elif ip_obj.version == 6: return "IPv6" + + def check_dnsname_resolution(self, dns_name: str) -> bool: + """ + Method to verify the dnsname resolution of the lab + + Args: + dns_name (str): a supposed valid dns name. + + Returns: + bool: True if the DNS name resolves to an IP address, False otherwise. + """ + family = socket.AF_INET + lab_config = ConfigurationManager.get_lab_config() + oam_fip = lab_config.get_floating_ip() + if self.check_ip_version(oam_fip) == "IPv6": + family = socket.AF_INET6 + try: + socket.getaddrinfo(dns_name, None, family) + return True + except socket.error as msg: + get_logger().log_error(f"nslookup failed with error '{msg}'") + return False diff --git a/resources/cloud_platform/security/cert_manager/global_policy.yaml b/resources/cloud_platform/security/cert_manager/global_policy.yaml new file mode 100644 index 00000000..109b0ac8 --- /dev/null +++ b/resources/cloud_platform/security/cert_manager/global_policy.yaml @@ -0,0 +1,16 @@ +apiVersion: crd.projectcalico.org/v1 +kind: GlobalNetworkPolicy +metadata: + name: gnp-oam-overrides +spec: + ingress: + - action: Allow + destination: + ports: + - 80 + - 443 + protocol: TCP + order: 500 + selector: has(iftype) && iftype == 'oam' + types: + - Ingress \ No newline at end of file diff --git a/resources/cloud_platform/security/cert_manager/kuard.yaml b/resources/cloud_platform/security/cert_manager/kuard.yaml new file mode 100644 index 00000000..f17d5d7c --- /dev/null +++ b/resources/cloud_platform/security/cert_manager/kuard.yaml @@ -0,0 +1,24 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + annotations: + cert-manager.io/issuer: stepca-issuer + kubernetes.io/ingress.class: nginx + name: kuard + namespace: pvtest +spec: + rules: + - host: '{{ dns_name }}' + http: + paths: + - backend: + service: + name: kuard + port: + number: 80 + path: / + pathType: Prefix + tls: + - hosts: + - '{{ dns_name }}' + secretName: kuard-ingress-tls \ No newline at end of file diff --git a/testcases/cloud_platform/regression/security/test_cert_manager.py b/testcases/cloud_platform/regression/security/test_cert_manager.py new file mode 100644 index 00000000..c96ede18 --- /dev/null +++ b/testcases/cloud_platform/regression/security/test_cert_manager.py @@ -0,0 +1,68 @@ +from pytest import mark + +from config.configuration_manager import ConfigurationManager +from framework.resources.resource_finder import get_stx_resource_path +from framework.validation.validation import validate_equals +from keywords.cloud_platform.rest.cloud_rest_client import CloudRestClient +from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords +from keywords.files.file_keywords import FileKeywords +from keywords.files.yaml_keywords import YamlKeywords +from keywords.k8s.certificate.kubectl_get_certificate_keywords import KubectlGetCertStatusKeywords +from keywords.k8s.certificate.kubectl_get_issuer_keywords import KubectlGetCertIssuerKeywords +from keywords.k8s.pods.kubectl_apply_pods_keywords import KubectlApplyPodsKeywords +from keywords.k8s.pods.kubectl_get_pods_keywords import KubectlGetPodsKeywords +from keywords.network.ip_address_keywords import IPAddressKeywords + + +@mark.p0 +def test_app_using_nginx_controller(): + """ + This test is to deploy an application which uses Nginx Ingress controller using a + certificate signed by External CA(acme stepCA) + + Steps: + - Deploy and apply the app file + - Deploy and apply the globalnetworkpolicy for the acme challenge + - Verify app status + - Verify cert is issued from StepCa + - Check the app url + + """ + ssh_connection = LabConnectionKeywords().get_active_controller_ssh() + lab_config = ConfigurationManager.get_lab_config() + oam_ip = lab_config.get_floating_ip() + dns_name = ConfigurationManager.get_security_config().get_dns_name() + dns_resolution_status = IPAddressKeywords(oam_ip).check_dnsname_resolution(dns_name=dns_name) + validate_equals(dns_resolution_status, True, "Verify the dns name resolution") + stepca_issuer = "stepca-issuer" + pod_name = "kuard" + cert = "kuard-ingress-tls" + base_url = f"https://{dns_name}/" + deploy_app_file_name = "deploy_app.yaml" + global_policy_file_name = "global_policy.yaml" + kuard_file_name = "kuard.yaml" + namespace = "pvtest" + + file_keywords = FileKeywords(ssh_connection) + file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{deploy_app_file_name}"), f"/home/sysadmin/{deploy_app_file_name}", overwrite=False) + file_keywords.upload_file(get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{global_policy_file_name}"), f"/home/sysadmin/{global_policy_file_name}", overwrite=False) + KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(f"/home/sysadmin/{global_policy_file_name}") + KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(f"/home/sysadmin/{deploy_app_file_name}") + # Check the issuer status + KubectlGetCertIssuerKeywords(ssh_connection).wait_for_issuer_status(stepca_issuer, True, namespace) + # Check the ingress pod status + get_pod_obj = KubectlGetPodsKeywords(ssh_connection) + pod_name = get_pod_obj.get_pods(namespace=namespace).get_unique_pod_matching_prefix(starts_with=pod_name) + + pod_status = KubectlGetPodsKeywords(ssh_connection).wait_for_pod_status(pod_name, "Running", namespace) + validate_equals(pod_status, True, "Verify ingress pods are running") + + template_file = get_stx_resource_path(f"resources/cloud_platform/security/cert_manager/{kuard_file_name}") + replacement_dictionary = {"dns_name": dns_name} + nginx_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(template_file, replacement_dictionary, f"{kuard_file_name}", "/home/sysadmin") + KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(nginx_yaml) + # Check the cert status + KubectlGetCertStatusKeywords(ssh_connection).wait_for_certs_status(cert, True, namespace) + # Check the app url + response = CloudRestClient().get(f"{base_url}") + validate_equals(response.get_status_code(), 200, "Verify the app url is reachable")