Addition of OIDC app testcases

Change-Id: Idedb362f67527c49f297e54bd3a3ee2db0f2e255
This commit is contained in:
Francischini de Souza, Gabriel
2025-04-21 16:06:24 -03:00
parent 6da45b4c9a
commit 98f77b4871
21 changed files with 738 additions and 22 deletions

View File

@@ -1,5 +1,6 @@
{
base_application_path: "/usr/local/share/applications/helm/",
istio_app_name: "istio",
metric_server_app_name: "metrics-server"
metric_server_app_name: "metrics-server",
oidc_app_name: "oidc-auth-apps"
}

View File

@@ -17,6 +17,7 @@ class AppConfig:
self.base_application_path = app_dict["base_application_path"]
self.istio_app_name = app_dict["istio_app_name"]
self.metric_server_app_name = app_dict["metric_server_app_name"]
self.oidc_app_name = app_dict["oidc_app_name"]
def get_base_application_path(self) -> str:
"""
@@ -47,3 +48,13 @@ class AppConfig:
"""
return self.metric_server_app_name
def get_oidc_app_name(self) -> str:
"""
Getter for oidc app name
Returns:
str: the oidc app name
"""
return self.oidc_app_name

View File

@@ -0,0 +1,118 @@
class SystemAddrpoolListObject:
"""
Class to handle the data provided by the 'system addrpool-list' command execution. This command generates the
output table shown below, where each object of this class represents a single row in that table.
+--------------------------------------+-----------------------------+----------+--------+--------+---------------------------+------------------+---------------------+---------------------+-----------------+
| uuid | name | network | prefix | order | ranges | floating_address | controller0_address | controller1_address | gateway_address |
+--------------------------------------+-----------------------------+----------+--------+--------+---------------------------+------------------+---------------------+---------------------+-----------------+
| 8091e435-996a-4543-bce3-00b283a22075 | cluster-host-subnet-ipv6 | aefd:205 | 64 | random | ['aefd:205::1-aefd:205:: | aefd:205::1 | aefd:205::2 | aefd:205::3 | None |
| | | :: | | | ffff:ffff:ffff:fffe'] | | | | |
| | | | | | | | | | |
| b5a65ed0-7370-49cf-b089-ba19b23c47e1 | cluster-pod-subnet-ipv6 | aefd:206 | 64 | random | ['aefd:206::1-aefd:206:: | None | None | None | None |
| | | :: | | | ffff:ffff:ffff:fffe'] | | | | |
| | | | | | | | | | |
| 696f3f8f-7e3b-4974-990c-99ae904bc808 | cluster-service-subnet-ipv6 | aefd:207 | 112 | random | ['aefd:207::1-aefd:207:: | None | None | None | None |
| | | :: | | | fffe'] | | | | |
| | | | | | | | | | |
| 6f437a9f-105e-4e3d-956d-5c0a5eaa0b30 | management-ipv6 | fdff:10: | 64 | random | ['fdff:10:80:237::2-fdff: | fdff:10:80:237:: | fdff:10:80:237::3 | fdff:10:80:237::4 | None |
| | | 80:237:: | | | 10:80:237::ffff'] | 2 | | | |
| | | | | | | | | | |
| 27a769b0-105f-41f2-a389-c49012b10233 | multicast-subnet-ipv6 | ff05::80 | 112 | random | ['ff05::80:237:0:1-ff05:: | None | None | None | None |
| | | :237:0:0 | | | 80:237:0:fffe'] | | | | |
| | | | | | | | | | |
| 7e82fd4c-d3bb-4766-817d-32d4cd69729c | oam-ipv6 | 2620:10a | 64 | random | ['2620:10a:a001:aa0c:: | 2620:10a:a001: | None | None | 2620:10a:a001: |
| | | :a001: | | | 1-2620:10a:a001:aa0c:ffff | aa0c::216 | | | aa0c::1 |
| | | aa0c:: | | | :ffff:ffff:fffe'] | | | | |
| | | | | | | | | | |
| 82a770b0-15da-4d96-ad87-1b5639f3aec2 | pxeboot | 192.168. | 24 | random | ['192.168.202.1-192.168. | 192.168.202.1 | 192.168.202.2 | 192.168.202.3 | None |
| | | 202.0 | | | 202.254'] | | | | |
| | | | | | | | | | |
+--------------------------------------+-----------------------------+----------+--------+--------+---------------------------+------------------+---------------------+---------------------+-----------------+
"""
def __init__(
self,
uuid,
name,
network,
order,
ranges,
floating_address,
controller0_address,
controller1_address,
gateway_address
):
self.uuid = uuid
self.name = name
self.network = network
self.order = order
self.ranges = ranges
self.floating_address = floating_address
self.controller0_address = controller0_address
self.controller1_address = controller1_address
self.gateway_address = gateway_address
def get_uuid(self) -> str:
"""
Getter for uuid
Returns: the uuid
"""
return self.uuid
def get_name(self) -> str:
"""
Getter for name
Returns: the name
"""
return self.name
def get_network(self) -> str:
"""
Getter for network
Returns: the network
"""
return self.network
def get_order(self) -> str:
"""
Getter for order
Returns: the order
"""
return self.order
def get_ranges(self) -> str:
"""
Getter for ranges
Returns: the ranges
"""
return self.ranges
def get_floating_address(self) -> str:
"""
Getter for floating_address
Returns: the floating_address
"""
return self.floating_address
def get_controller0_address(self) -> str:
"""
Getter for controller0_address
Returns: the controller0_address
"""
return self.controller0_address
def get_controller1_address(self) -> str:
"""
Getter for controller1_address
Returns: the controller1_address
"""
return self.controller1_address
def get_gateway_address(self) -> str:
"""
Getter for gateway_address
Returns: the gateway_address
"""
return self.gateway_address

View File

@@ -0,0 +1,106 @@
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.system.addrpool.object.system_addrpool_list_object import SystemAddrpoolListObject
from keywords.cloud_platform.system.system_table_parser import SystemTableParser
class SystemAddrpoolListOutput:
"""
This class parses the output of the command 'system addrpool-list'
The parsing result is a 'SystemAddrpoolListObject' instance.
Example:
'system addrpool-list'
+--------------------------------------+-----------------------------+----------+--------+--------+---------------------------+------------------+---------------------+---------------------+-----------------+
| uuid | name | network | prefix | order | ranges | floating_address | controller0_address | controller1_address | gateway_address |
+--------------------------------------+-----------------------------+----------+--------+--------+---------------------------+------------------+---------------------+---------------------+-----------------+
| 1e836335-80a0-427c-ae18-87dbe5f0e20e | cluster-host-subnet-ipv4 | 192.168. | 24 | random | ['192.168.206.1-192.168. | 192.168.206.1 | 192.168.206.2 | 192.168.206.3 | None |
| | | 206.0 | | | 206.254'] | | | | |
| | | | | | | | | | |
| 88062da8-7839-4a99-b051-e24ef6c8bf75 | cluster-pod-subnet-ipv4 | 172.16.0 | 16 | random | ['172.16.0.1-172.16.255. | None | None | None | None |
| | | .0 | | | 254'] | | | | |
| | | | | | | | | | |
| 1d00691b-f01e-46e9-9e54-916d766d1277 | cluster-service-subnet-ipv4 | 10.96.0. | 12 | random | ['10.96.0.1-10.111.255. | None | None | None | None |
| | | 0 | | | 254'] | | | | |
| | | | | | | | | | |
| 38d18176-481e-4ef2-9540-25e448b34dc0 | management-ipv4 | 10.8.69. | 24 | random | ['10.8.69.2-10.8.69.254'] | 10.8.69.2 | 10.8.69.3 | 10.8.69.4 | None |
| | | 0 | | | | | | | |
| | | | | | | | | | |
| ccf0d793-0ed0-4d6e-a407-7186cf391955 | multicast-subnet-ipv4 | 239.1.1. | 28 | random | ['239.1.1.1-239.1.1.14'] | None | None | None | None |
| | | 0 | | | | | | | |
| | | | | | | | | | |
| c3f4f28e-aa84-47ec-843e-2e1efcff563e | oam-ipv4 | 128.224. | 23 | random | ['128.224.48.1-128.224.49 | 128.224.48.232 | None | None | 128.224.48.1 |
| | | 48.0 | | | .254'] | | | | |
| | | | | | | | | | |
| b46bc265-e5e5-4980-b476-35599ebe5961 | pxeboot | 192.168. | 24 | random | ['192.168.202.1-192.168. | 192.168.202.1 | 192.168.202.2 | 192.168.202.3 | None |
| | | 202.0 | | | 202.254'] | | | | |
| | | | | | | | | | |
+--------------------------------------+-----------------------------+----------+--------+--------+---------------------------+------------------+---------------------+---------------------+-----------------+
"""
def __init__(self, system_addrpool_list_output):
"""
Constructor
Args:
system_addrpool_list_output: the output of the command 'system addrpool-list'
"""
self.system_addrpool: [SystemAddrpoolListObject] = []
system_table_parser = SystemTableParser(system_addrpool_list_output)
output_values = system_table_parser.get_output_values_list()
for value in output_values:
if self.is_valid_output(value):
self.system_addrpool.append(
SystemAddrpoolListObject(
value['uuid'],
value['name'],
value['network'],
value['order'],
value['ranges'],
value['floating_address'],
value['controller0_address'],
value['controller1_address'],
value['gateway_address'],
)
)
else:
raise KeywordException(f"The output line {value} was not valid")
def get_addrpool(self) -> [SystemAddrpoolListObject]:
"""
Returns the list of addrpool objects
"""
return self.system_addrpool
def get_floating_address_by_name(self, name: str) -> str:
"""
Gets the floating address for the given name.
Args:
name: the name of the desired addrpool
Returns:
The floating address of the addrpool with the specified name.
"""
addrpools = list(filter(lambda pool: pool.get_name() == name, self.system_addrpool))
if not addrpools:
raise KeywordException(f"No addrpool with name {name} was found.")
return addrpools[0].get_floating_address()
@staticmethod
def is_valid_output(value):
required_keys = ['uuid', 'name', 'network', 'order', 'ranges', 'floating_address', 'controller0_address', 'controller1_address', 'gateway_address']
for key in required_keys:
if key not in value:
get_logger().log_error(f'{key} is not in the output value: {value}')
return False
return True

View File

@@ -0,0 +1,45 @@
from config.configuration_manager import ConfigurationManager
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.addrpool.object.system_addrpool_list_output import SystemAddrpoolListOutput
class SystemAddrpoolListKeywords(BaseKeyword):
"""
This class contains all the keywords related to the 'system addrpool' commands.
"""
def __init__(self, ssh_connection):
"""
Constructor
Args:
ssh_connection:
"""
self.ssh_connection = ssh_connection
def get_system_addrpool_list(self) -> SystemAddrpoolListOutput:
"""
Gets a SystemAddrpoolOutput object related to the execution of the 'system addrpool-list' command.
Returns:
SystemAddrpoolListOutput: an instance of the SystemAddrpoolOutput object representing the
address pool of IPs on the host, as a result of the execution of the 'system addrpool-list' command.
"""
output = self.ssh_connection.send(source_openrc('system addrpool-list'))
self.validate_success_return_code(self.ssh_connection)
system_addrpool_list_output = SystemAddrpoolListOutput(output)
return system_addrpool_list_output
def get_management_floating_address(self) -> str:
"""
Retrieves the floating address for the addrpool with name 'management-ipv4' or 'management-ipv6' depending on lab IP type.
Returns:
The floating address for the management-ipv4/ipv6.
"""
management_ip_type = "management-ipv6" if ConfigurationManager.get_lab_config().is_ipv6() else "management-ipv4"
system_addrpool_list_output = self.get_system_addrpool_list()
return system_addrpool_list_output.get_floating_address_by_name(management_ip_type)

View File

@@ -0,0 +1,32 @@
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
class SystemHelmKeywords(BaseKeyword):
"""
This class contains all the keywords related to the 'system helm' commands.
"""
def __init__(self, ssh_connection):
"""
Constructor
Args:
ssh_connection:
"""
self.ssh_connection = ssh_connection
def helm_override_update(self, app_name: str, chart_name: str, namespace: str, values: str):
"""
Update helm chart user overrides.
Args:
app_name (str): Name of the application
chart_name (str): Name of the chart
namespace (str): Namespace of chart overrides
values (str): YAML file containing helm chart override values
"""
command = source_openrc(f"system helm-override-update {app_name} {chart_name} {namespace} --values {values}")
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)

View File

@@ -2,6 +2,8 @@ from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.system.service.objects.system_service_output import SystemServiceOutput
from keywords.cloud_platform.system.service.objects.system_service_show_output import SystemServiceShowOutput
from keywords.k8s.pods.kubectl_get_pods_keywords import KubectlGetPodsKeywords
from time import sleep
class SystemServiceKeywords(BaseKeyword):
@@ -48,4 +50,27 @@ class SystemServiceKeywords(BaseKeyword):
output = self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
system_service_show_output = SystemServiceShowOutput(output)
return system_service_show_output
return system_service_show_output
def add_service_parameter(self, service: str, parameter: str, value: str):
"""
Adds a service parameter.
Args:
service (str): The service name.
parameter (str): The parameter to add.
value (str): The value of the parameter.
"""
command = source_openrc(f'system service-parameter-add {service} {parameter}={value}')
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
def apply_kubernetes_service_parameters(self):
"""
Applies kubernetes service parameters and waits for it to restart.
"""
command = source_openrc(f'system service-parameter-apply kubernetes')
self.ssh_connection.send(command)
self.validate_success_return_code(self.ssh_connection)
KubectlGetPodsKeywords(self.ssh_connection).wait_for_kubernetes_to_restart()

View File

@@ -83,6 +83,20 @@ class FileKeywords(BaseKeyword):
get_logger().log_info(f"{file_name} does not exist.")
return False
def create_file_with_echo(self, file_name: str, content: str) -> bool:
"""
Creates a file based on its content with the echo command.
Args:
file_name (str): the file name.
content (str): content to be added in the file.
Returns:
bool: True if create successful, False otherwise.
"""
self.ssh_connection.send(f"echo '{content}' > {file_name}")
return self.file_exists(file_name)
def delete_file(self, file_name: str) -> bool:
"""
Deletes the file.

View File

@@ -1,6 +1,8 @@
import time
from framework.ssh.ssh_connection import SSHConnection
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.k8s.k8s_command_wrapper import export_k8s_config
from keywords.k8s.pods.object.kubectl_get_pods_output import KubectlGetPodsOutput
@@ -40,6 +42,30 @@ class KubectlGetPodsKeywords(BaseKeyword):
pods_list_output = KubectlGetPodsOutput(kubectl_get_pods_output)
return pods_list_output
def get_pods_no_validation(self, namespace: str = None) -> KubectlGetPodsOutput:
"""
Gets the k8s pods that are available using '-o wide'.
Args:
namespace(str, optional): The namespace to search for pods. If None, it will search in all namespaces.
Returns:
KubectlGetPodsOutput: An object containing the parsed output of the command.
"""
arg_namespace = ""
if namespace:
arg_namespace = f"-n {namespace}"
kubectl_get_pods_output = self.ssh_connection.send(export_k8s_config(f"kubectl {arg_namespace} -o wide get pods"))
rc = self.ssh_connection.get_return_code()
if rc != 0:
return None
pods_list_output = KubectlGetPodsOutput(kubectl_get_pods_output)
return pods_list_output
def get_pods_all_namespaces(self) -> KubectlGetPodsOutput:
"""
@@ -71,9 +97,11 @@ class KubectlGetPodsKeywords(BaseKeyword):
pod_status_timeout = time.time() + timeout
while time.time() < pod_status_timeout:
pod_status = self.get_pods(namespace).get_pod(pod_name).get_status()
if pod_status == expected_status:
return True
pods_output = self.get_pods_no_validation()
if pods_output:
pod_status = self.get_pods(namespace).get_pod(pod_name).get_status()
if pod_status == expected_status:
return True
time.sleep(5)
return False
@@ -100,6 +128,7 @@ class KubectlGetPodsKeywords(BaseKeyword):
time.sleep(5)
return False
def wait_for_pods_to_reach_status(self, expected_status: str, pod_names: list, namespace: str = None, poll_interval: int = 5, timeout: int = 180) -> bool:
"""
Waits timeout amount of time for the given pod in a namespace to be in the given status
@@ -125,4 +154,45 @@ class KubectlGetPodsKeywords(BaseKeyword):
time.sleep(poll_interval)
raise KeywordException(f"Pods {pod_names} in namespace {namespace} did not reach status {expected_status} within {timeout} seconds")
def wait_for_kubernetes_to_restart(self, timeout: int = 600, check_interval: int = 20) -> bool:
"""
Wait for the Kubernetes API to go down, then wait for the kube-apiserver pod to be Running.
Args:
timeout (int): Maximum time to wait in seconds.
check_interval (int): Time between checks in seconds.
Returns:
bool: True if Kubernetes restarted and kube-apiserver pod becomes Running.
Raises:
TimeoutError: If the Kubernetes API doesn't restart properly.
"""
def is_kubernetes_up() -> bool:
output = self.ssh_connection.send(export_k8s_config("kubectl get pods -A"))
return "was refused - did you specify the right host or port?" not in output[0]
validate_equals_with_retry(
function_to_execute=is_kubernetes_up,
expected_value=False,
validation_description="Kubernetes is down for a restart",
timeout=timeout,
polling_sleep_time=check_interval,
)
validate_equals_with_retry(
function_to_execute=is_kubernetes_up,
expected_value=True,
validation_description="Kubernetes is back up after the restart",
timeout=timeout,
polling_sleep_time=check_interval,
)
return self.wait_for_pod_status(
pod_name="kube-apiserver-controller-0",
expected_status="Running",
namespace="kube-system",
timeout=timeout
)

View File

@@ -1,7 +1,9 @@
from config.docker.objects.registry import Registry
from keywords.base_keyword import BaseKeyword
from framework.logging.automation_logger import get_logger
from framework.exceptions.keyword_exception import KeywordException
from keywords.k8s.k8s_command_wrapper import export_k8s_config
from keywords.k8s.secret.kubectl_get_secret_keywords import KubectlGetSecretsKeywords
class KubectlCreateSecretsKeywords(BaseKeyword):
"""
@@ -34,14 +36,29 @@ class KubectlCreateSecretsKeywords(BaseKeyword):
export_k8s_config(f"kubectl create secret -n {namespace} docker-registry {secret_name} --docker-server={docker_server} " f"--docker-username={user_name} --docker-password={password}")
)
def create_secret_generic(self, secret_name: str, tls_crt: str, tls_key: str, namespace: str):
def create_secret_generic(self, namespace: str, secret_name: str, tls_crt: str, tls_key: str = None):
"""
Create a generic secret
Create a generic secret, with explicit filename, or tls.crt / tls.key
Args:
namespace (str): namespace
secret_name (str): the secret name
tls_crt (str): tls_crt file name
tls_key (str): tls_key file name
namespace (str): namespace
tls_key (str): tls_key file name (optional)
"""
self.ssh_connection.send(export_k8s_config(f"kubectl create -n {namespace} secret generic {secret_name} --from-file=tls.crt={tls_crt} --from-file=tls.key={tls_key}"))
base_cmd = f"kubectl create -n {namespace} secret generic {secret_name}"
if tls_key:
base_cmd += f" --from-file=tls.crt={tls_crt} --from-file=tls.key={tls_key}"
else:
base_cmd += f" --from-file={tls_crt}"
self.ssh_connection.send(export_k8s_config(base_cmd))
self.validate_success_return_code(self.ssh_connection)
list_of_secrets = KubectlGetSecretsKeywords(self.ssh_connection).get_secret_names(namespace=namespace)
if secret_name in list_of_secrets:
get_logger().log_info(f"Kubernetes secret {secret_name} created successfully.")
else:
raise KeywordException(f"Failed to create Kubernetes secret {secret_name}.")

View File

@@ -3,16 +3,13 @@ from keywords.base_keyword import BaseKeyword
from keywords.k8s.k8s_command_wrapper import export_k8s_config
from keywords.k8s.secret.object.kubectl_get_secret_output import KubectlGetSecretOutput
class KubectlGetSecretsKeywords(BaseKeyword):
"""
Keyword class for retrieving Kubernetes secrets.
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor.
Args:
ssh_connection (SSHConnection): The SSH connection object.
"""
@@ -21,28 +18,45 @@ class KubectlGetSecretsKeywords(BaseKeyword):
def get_secrets(self, namespace: str = "default") -> KubectlGetSecretOutput:
"""
Runs `kubectl get secrets` and returns a parsed output object.
Args:
namespace (str): Kubernetes namespace
Returns:
KubectlGetSecretOutput: Parsed secrets list
"""
cmd = f"kubectl get secrets -n {namespace}"
output = self.ssh_connection.send(export_k8s_config(cmd))
self.validate_success_return_code(self.ssh_connection)
return KubectlGetSecretOutput(output)
def get_secret_names(self, namespace: str = "default") -> list[str]:
"""
Returns a list of secret names in the given namespace.
Args:
namespace (str): Kubernetes namespace
Returns:
list[str]: Secret names
"""
secrets_output = self.get_secrets(namespace)
return [secret.get_name() for secret in secrets_output.kubectl_secret]
def get_secret_with_custom_output(self, secret_name: str, namespace: str, output_format: str, extra_parameters: str = None, base64: bool = False) -> str:
"""
Get a Kubernetes secret with a custom output format and optional extra parameters.
Args:
secret_name (str): The name of the secret to retrieve.
namespace (str): The namespace where the secret is located.
output_format (str): The output format (e.g., jsonpath, yaml, etc.).
extra_parameters (str, optional): Additional parameters for the output format.
Returns:
str: The output from the kubectl get command.
"""
command = f"kubectl get secret {secret_name} -n {namespace} -o {output_format}"
if extra_parameters:
command += f"={extra_parameters}"
if base64:
command += f" | base64 --decode"
output = self.ssh_connection.send(export_k8s_config(command))
return ''.join(output)

View File

@@ -0,0 +1,25 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
class KeyringKeywords(BaseKeyword):
"""
Keyring Keywords class
"""
def __init__(self, ssh_connection: SSHConnection):
self.ssh_connection = ssh_connection
def get_keyring(self, service: str, identifier: str) -> str:
"""
Gets a value from the keyring.
Args:
service (str): keyring service
identifier (str): keyring identifier
Returns:
The value from the keyring.
"""
keyring_value = self.ssh_connection.send(f"keyring get {service} {identifier}")
self.validate_success_return_code(self.ssh_connection)
return keyring_value[0].strip()

View File

@@ -0,0 +1,51 @@
# To use this yaml file, you must have the following variables defined:
# - oam_ip: The oam ip of the lab
# - mgmt_ip: The management ip of the lab
# - bind_pw: The bind password to log in
config:
staticClients:
- id: stx-oidc-client-app
name: STX OIDC Client app
redirectURIs: ['https://"{{ oam_ip }}":30555/callback']
secret: stx-oidc-client-p@ssw0rd
expiry:
idTokens: "10h"
connectors:
- type: ldap
name: LocalLDAP
id: localldap-1
config:
host: '"{{ mgmt_ip }}":636'
rootCA: /etc/ssl/certs/adcert/local-ldap-ca-cert.crt
insecureNoSSL: false
insecureSkipVerify: false
bindDN: CN=ldapadmin,DC=cgcs,DC=local
bindPW: "{{ bind_pw }}"
usernamePrompt: Username
userSearch:
baseDN: ou=People,dc=cgcs,dc=local
filter: "(objectClass=posixAccount)"
username: uid
idAttr: DN
emailAttr: uid
nameAttr: gecos
groupSearch:
baseDN: ou=Group,dc=cgcs,dc=local
filter: "(objectClass=posixGroup)"
userMatchers:
- userAttr: uid
groupAttr: memberUid
nameAttr: cn
volumeMounts:
- mountPath: /etc/ssl/certs/adcert
name: certdir
- mountPath: /etc/dex/tls
name: https-tls
volumes:
- name: certdir
secret:
secretName: local-ldap-ca-cert
- name: https-tls
secret:
defaultMode: 420
secretName: oidc-auth-apps-certificate

View File

@@ -0,0 +1,22 @@
# To use this yaml file, you must have the following variable defined:
# - oam_ip: The oam ip of the lab
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: oidc-auth-apps-certificate
namespace: kube-system
spec:
secretName: oidc-auth-apps-certificate
duration: 2160h # 90 days
renewBefore: 360h # 15 days
issuerRef:
name: system-local-ca
kind: ClusterIssuer
commonName: "{{ oam_ip }}"
subject:
organizations:
- ABC-Company
organizationalUnits:
- StarlingX-system-oidc-auth-apps
ipAddresses:
- "{{ oam_ip }}"

View File

@@ -0,0 +1,8 @@
tlsName: oidc-auth-apps-certificate
config:
# The |OIDC|-client container mounts the dex-ca-cert secret at /home, therefore
# issuer_root_ca: /home/<filename-only-of-generic-secret>
issuer_root_ca: /home/dex-ca-cert.crt
issuer_root_ca_secret: dex-ca-cert
# secret for accessing dex
client_secret: stx-oidc-client-p@ssw0rd

View File

@@ -0,0 +1,11 @@
cronSchedule: "*/15 * * * *"
observedSecrets:
- secretName: "dex-ca-cert"
filename: "dex-ca-cert.crt"
deploymentToRestart: "stx-oidc-client"
- secretName: "oidc-auth-apps-certificate"
filename: "tls.crt"
deploymentToRestart: "stx-oidc-client"
- secretName: "oidc-auth-apps-certificate"
filename: "tls.crt"
deploymentToRestart: "oidc-dex"

View File

@@ -0,0 +1,143 @@
from config.configuration_manager import ConfigurationManager
from keywords.files.file_keywords import FileKeywords
from keywords.files.yaml_keywords import YamlKeywords
from keywords.linux.keyring.keyring_keywords import KeyringKeywords
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.cloud_platform.system.addrpool.system_addrpool_list_keywords import SystemAddrpoolListKeywords
from keywords.cloud_platform.system.helm.system_helm_keywords import SystemHelmKeywords
from keywords.cloud_platform.system.application.system_application_list_keywords import SystemApplicationListKeywords
from keywords.cloud_platform.system.application.system_application_upload_keywords import SystemApplicationUploadKeywords
from keywords.cloud_platform.system.application.system_application_apply_keywords import SystemApplicationApplyKeywords
from keywords.cloud_platform.system.application.system_application_remove_keywords import SystemApplicationRemoveKeywords
from keywords.cloud_platform.system.application.system_application_delete_keywords import SystemApplicationDeleteKeywords
from keywords.cloud_platform.system.application.system_application_upload_keywords import SystemApplicationUploadInput
from keywords.cloud_platform.system.application.system_application_remove_keywords import SystemApplicationRemoveInput
from keywords.cloud_platform.system.application.system_application_delete_keywords import SystemApplicationDeleteInput
from keywords.cloud_platform.system.application.object.system_application_status_enum import SystemApplicationStatusEnum
from keywords.cloud_platform.system.service.system_service_keywords import SystemServiceKeywords
from keywords.k8s.pods.kubectl_apply_pods_keywords import KubectlApplyPodsKeywords
from keywords.k8s.secret.kubectl_create_secret_keywords import KubectlCreateSecretsKeywords
from keywords.k8s.secret.kubectl_get_secret_keywords import KubectlGetSecretsKeywords
from framework.resources.resource_finder import get_stx_resource_path
from framework.validation.validation import validate_equals, validate_not_equals
def test_install_oidc():
"""
Install (Upload and Apply) Application OIDC
Raises:
Exception: If application OIDC failed to upload or apply
"""
# Setups app configs, obj instances and lab connection
app_config = ConfigurationManager.get_app_config()
base_path = app_config.get_base_application_path()
oidc_name = app_config.get_oidc_app_name()
file_path_oidc: str = "/home/sysadmin/oidc/"
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
file_keywords = FileKeywords(ssh_connection)
helm_keywords = SystemHelmKeywords(ssh_connection)
oam_ip = ConfigurationManager.get_lab_config().get_floating_ip()
addrpool_keywords = SystemAddrpoolListKeywords(ssh_connection)
mgmt_ip = addrpool_keywords.get_management_floating_address()
bind_pw = KeyringKeywords(ssh_connection).get_keyring(service="ldap", identifier="ldapadmin")
lab_ipv6 = ConfigurationManager.get_lab_config().is_ipv6()
oam_ip = f"[{oam_ip}]" if lab_ipv6 else oam_ip
# Verifies if the app is already uploaded
system_applications = SystemApplicationListKeywords(ssh_connection).get_system_application_list()
if system_applications.is_in_application_list(oidc_name):
oidc_app_status = system_applications.get_application(oidc_name).get_status()
validate_equals(oidc_app_status, "uploaded", f"{oidc_name} is already uploaded")
else:
# Setups the upload input object
system_application_upload_input = SystemApplicationUploadInput()
system_application_upload_input.set_app_name(oidc_name)
system_application_upload_input.set_tar_file_path(f"{base_path}{oidc_name}*.tgz")
# Uploads the app file and verifies it
SystemApplicationUploadKeywords(ssh_connection).system_application_upload(system_application_upload_input)
system_applications = SystemApplicationListKeywords(ssh_connection).get_system_application_list()
oidc_app_status = system_applications.get_application(oidc_name).get_status()
validate_equals(oidc_app_status, "uploaded", f"{oidc_name} upload status validation")
# Configures and applies Kubernetes OIDC service parameters
sys_service = SystemServiceKeywords(ssh_connection)
sys_service.add_service_parameter("kubernetes", "kube_apiserver oidc-client-id", "stx-oidc-client-app")
sys_service.add_service_parameter("kubernetes", "kube_apiserver oidc-groups-claim", "groups")
sys_service.add_service_parameter("kubernetes", "kube_apiserver oidc-issuer-url", f"https://{oam_ip}:30556/dex")
sys_service.add_service_parameter("kubernetes", "kube_apiserver oidc-username-claim", "email")
sys_service.apply_kubernetes_service_parameters()
# Creates and applies OIDC auth yaml
file_keywords.create_directory(file_path_oidc)
template_file = get_stx_resource_path("resources/cloud_platform/nightly_regression/oidc/oidc-auth-apps-certificate.yaml")
replacement_dictionary = {"oam_ip": oam_ip.strip("[]")}
oidc_auth_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(template_file, replacement_dictionary, "oidc-auth-apps-certificate.yaml", file_path_oidc, True)
KubectlApplyPodsKeywords(ssh_connection).apply_from_yaml(oidc_auth_yaml)
# Creates dex and local LDAP certs
dex_ca_cert = KubectlGetSecretsKeywords(ssh_connection).get_secret_with_custom_output(secret_name="system-local-ca", namespace="cert-manager", output_format="jsonpath", extra_parameters='"{.data.ca\.crt}"', base64=True)
file_keywords.create_file_with_echo("/home/sysadmin/oidc/dex-ca-cert.crt", dex_ca_cert)
KubectlCreateSecretsKeywords(ssh_connection).create_secret_generic(namespace="kube-system", secret_name="dex-ca-cert", tls_crt="/home/sysadmin/oidc/dex-ca-cert.crt")
local_ldap_ca_cert = KubectlGetSecretsKeywords(ssh_connection).get_secret_with_custom_output(secret_name="system-local-ca", namespace="cert-manager", output_format="jsonpath", extra_parameters='"{.data.ca\.crt}"', base64=True)
file_keywords.create_file_with_echo("/home/sysadmin/oidc/local-ldap-ca-cert.crt", local_ldap_ca_cert)
KubectlCreateSecretsKeywords(ssh_connection).create_secret_generic(namespace="kube-system", secret_name="local-ldap-ca-cert", tls_crt="/home/sysadmin/oidc/local-ldap-ca-cert.crt")
# Creates and applies OIDC client, secret observer and dex overrides
yaml_path = get_stx_resource_path("resources/cloud_platform/nightly_regression/oidc/oidc-client-overrides.yaml")
file_keywords.upload_file(yaml_path, f"{file_path_oidc}oidc-client-overrides.yaml", False)
helm_keywords.helm_override_update("oidc-auth-apps", "oidc-client", "kube-system", f"{file_path_oidc}oidc-client-overrides.yaml")
yaml_path = get_stx_resource_path("resources/cloud_platform/nightly_regression/oidc/secret-observer-overrides.yaml")
file_keywords.upload_file(yaml_path, f"{file_path_oidc}secret-observer-overrides.yaml", False)
helm_keywords.helm_override_update("oidc-auth-apps", "secret-observer", "kube-system", f"{file_path_oidc}secret-observer-overrides.yaml")
template_file = get_stx_resource_path("resources/cloud_platform/nightly_regression/oidc/dex-overrides.yaml")
replacement_dictionary = {"oam_ip": oam_ip, "mgmt_ip": mgmt_ip, "bind_pw": bind_pw}
dex_auth_yaml = YamlKeywords(ssh_connection).generate_yaml_file_from_template(template_file, replacement_dictionary, "dex-overrides.yaml", file_path_oidc, True)
helm_keywords.helm_override_update("oidc-auth-apps", "dex", "kube-system", dex_auth_yaml)
# Applies the app and verifies if it became applied
system_application_apply_output = SystemApplicationApplyKeywords(ssh_connection).system_application_apply(oidc_name)
system_application_object = system_application_apply_output.get_system_application_object()
validate_not_equals(system_application_object, None, f"System application object should not be None")
validate_equals(system_application_object.get_name(), oidc_name, f"Application name validation")
validate_equals(system_application_object.get_status(), SystemApplicationStatusEnum.APPLIED.value, f"Application status validation")
def test_uninstall_oidc():
"""
Uninstall (Remove and Delete) Application OIDC
Raises:
Exception: If application OIDC failed to remove or delete
"""
# Setups app configs and lab connection
app_config = ConfigurationManager.get_app_config()
oidc_name = app_config.get_oidc_app_name()
lab_connect_keywords = LabConnectionKeywords()
ssh_connection = lab_connect_keywords.get_active_controller_ssh()
# Verifies if the app is present in the system
system_applications = SystemApplicationListKeywords(ssh_connection).get_system_application_list()
validate_equals(system_applications.is_in_application_list(oidc_name), True, f"The {oidc_name} application should be uploaded/applied on the system")
# Removes the application
system_application_remove_input = SystemApplicationRemoveInput()
system_application_remove_input.set_app_name(oidc_name)
system_application_remove_input.set_force_removal(False)
system_application_output = SystemApplicationRemoveKeywords(ssh_connection).system_application_remove(system_application_remove_input)
validate_equals(system_application_output.get_system_application_object().get_status(), SystemApplicationStatusEnum.UPLOADED.value, f"Application removal status validation")
# Deletes the application
system_application_delete_input = SystemApplicationDeleteInput()
system_application_delete_input.set_app_name(oidc_name)
system_application_delete_input.set_force_deletion(False)
delete_msg = SystemApplicationDeleteKeywords(ssh_connection).get_system_application_delete(system_application_delete_input)
validate_equals(delete_msg, f"Application {oidc_name} deleted.\n", f"Application deletion message validation")

View File

@@ -16,6 +16,7 @@ def test_default_app_config():
assert default_config.get_base_application_path() == "/usr/local/share/applications/helm/", "default base path was incorrect"
assert default_config.get_istio_app_name() == "istio", "istio default app name was incorrect"
assert default_config.get_metric_server_app_name() == "metrics-server", "metric server default name was incorrect"
assert default_config.get_oidc_app_name() == "oidc-auth-apps", "oidc default app name was incorrect"
def test_custom_app_config():
@@ -33,3 +34,4 @@ def test_custom_app_config():
assert custom_config.get_base_application_path() == "fake_path", "custom base path was incorrect"
assert custom_config.get_istio_app_name() == "istio_custom", "istio custom app name was incorrect"
assert custom_config.get_metric_server_app_name() == "metrics-server_custom", "metric server custom name was incorrect"
assert custom_config.get_oidc_app_name() == "oidc-auth-apps_custom", "oidc custom app name was incorrect"

View File

@@ -1,5 +1,6 @@
{
base_application_path: "fake_path",
istio_app_name: "istio_custom",
metric_server_app_name: "metrics-server_custom"
metric_server_app_name: "metrics-server_custom",
oidc_app_name: "oidc-auth-apps_custom"
}