Merge "add test deploy, abort, and resume functionality"

This commit is contained in:
Zuul
2025-04-29 15:21:42 +00:00
committed by Gerrit Code Review
3 changed files with 141 additions and 41 deletions

View File

@@ -41,11 +41,12 @@ class DCManagerSubcloudDeployKeywords(BaseKeyword):
self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
def dcmanager_subcloud_deploy_create(self, subcloud_name: str):
def dcmanager_subcloud_deploy_create(self, subcloud_name: str, wait_for_status: bool = True):
"""Creates the subcloud using 'dcmanager subcloud deploy create'.
Args:
subcloud_name (str): a str name for the subcloud.
wait_for_status (bool): wheteher to check for status or not default is True
"""
# Get the subcloud config
sc_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name)
@@ -64,15 +65,17 @@ class DCManagerSubcloudDeployKeywords(BaseKeyword):
self.validate_success_return_code(self.ssh_connection)
# validate subcloud status until complete
success_status = "create-complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
if wait_for_status:
success_status = "create-complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
def dcmanager_subcloud_deploy_install(self, subcloud_name: str):
def dcmanager_subcloud_deploy_install(self, subcloud_name: str, wait_for_status: bool = True):
"""Installs the subcloud using 'dcmanager subcloud deploy install'.
Args:
subcloud_name (str): a str name for the subcloud.
wait_for_status (bool): wheteher to check for status or not default is True
"""
# Get the subcloud config
deployment_assets_config = ConfigurationManager.get_deployment_assets_config()
@@ -88,15 +91,17 @@ class DCManagerSubcloudDeployKeywords(BaseKeyword):
self.validate_success_return_code(self.ssh_connection)
# validate subcloud status until complete
success_status = "install-complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
if wait_for_status:
success_status = "install-complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
def dcmanager_subcloud_deploy_bootstrap(self, subcloud_name: str):
def dcmanager_subcloud_deploy_bootstrap(self, subcloud_name: str, wait_for_status: bool = True):
"""Bootstraps the subcloud using 'dcmanager subcloud deploy bootstrap'.
Args:
subcloud_name (str): a str name for the subcloud.
wait_for_status (bool): wheteher to check for status or not default is True
"""
# Get the subcloud config
deployment_assets_config = ConfigurationManager.get_deployment_assets_config()
@@ -114,15 +119,17 @@ class DCManagerSubcloudDeployKeywords(BaseKeyword):
self.validate_success_return_code(self.ssh_connection)
# validate subcloud status until complete
success_status = "bootstrap-complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
if wait_for_status:
success_status = "bootstrap-complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
def dcmanager_subcloud_deploy_config(self, subcloud_name: str):
def dcmanager_subcloud_deploy_config(self, subcloud_name: str, wait_for_status: bool = True):
"""Configures the subcloud using 'dcmanager subcloud deploy config'.
Args:
subcloud_name (str): a str name for the subcloud.
wait_for_status (bool): wheteher to check for status or not default is True
"""
# Get the subcloud config
deployment_assets_config = ConfigurationManager.get_deployment_assets_config()
@@ -138,6 +145,36 @@ class DCManagerSubcloudDeployKeywords(BaseKeyword):
self.validate_success_return_code(self.ssh_connection)
# validate subcloud status until complete
success_status = "complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
if wait_for_status:
success_status = "complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)
def dcmanager_subcloud_deploy_resume(self, subcloud_name: str, wait_for_status: bool = True):
"""Creates the subcloud using 'dcmanager subcloud deploy resume'.
Args:
subcloud_name (str): a str name for the subcloud.
wait_for_status (bool): wheteher to check for status or not default is True
"""
# Get the subcloud config
sc_config = ConfigurationManager.get_lab_config().get_subcloud(subcloud_name)
# Get the subcloud deployment assets
deployment_assets_config = ConfigurationManager.get_deployment_assets_config()
sc_assets = deployment_assets_config.get_subcloud_deployment_assets(subcloud_name)
bootstrap_file = sc_assets.get_bootstrap_file()
deploy_file = sc_assets.get_deployment_config_file()
admin_creds = sc_config.get_admin_credentials()
# Get the subcloud bootstrap address
boot_add = sc_config.get_first_controller().get_ip()
# Execute the command
cmd = f"dcmanager subcloud deploy resume {subcloud_name} --bootstrap-address {boot_add} --bootstrap-values {bootstrap_file} --deploy-config {deploy_file} --sysadmin-password {admin_creds.get_password()} --bmc-password {sc_config.get_bm_password()} "
self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
# validate subcloud status until complete
if wait_for_status:
success_status = "complete"
dc_manager_sc_list_kw = DcManagerSubcloudListKeywords(self.ssh_connection)
dc_manager_sc_list_kw.validate_subcloud_status(subcloud_name, success_status)

View File

@@ -213,7 +213,7 @@ class DcManagerSubcloudListOutput:
return min(subclouds, key=lambda subcloud: int(subcloud.get_id()))
def get_undeployed_subcloud_name(self, lab_type: str) -> str:
def get_undeployed_subcloud_name(self, lab_type: str = None) -> str:
"""
Fetch the name of the first subcloud that is not deployed.
@@ -226,10 +226,7 @@ class DcManagerSubcloudListOutput:
lab_type (str): The type of the lab (e.g., 'simplex', 'duplex').
Returns:
str: The name of the first subcloud that is not deployed.
Raises:
ValueError: If no suitable subcloud is found.
str: The name of the first subcloud that is not deployed or None.
"""
# fetch all the subclouds from the system
deployed_sc_names = [i.get_name() for i in self.dcmanager_subclouds]
@@ -238,16 +235,13 @@ class DcManagerSubcloudListOutput:
# fetch all the subclouds from the lab config
sc_names_from_config = lab_config.get_subclouds_name_by_type(lab_type)
# raise if there is no subcloud in the config for the given type
# return None if there is no subcloud in the config for the given type
if not sc_names_from_config:
error_message = f"No Lab found with Type: {lab_type} in the configuration."
get_logger().log_exception(error_message)
raise ValueError(error_message)
return None
all_free_sc = set(sc_names_from_config).difference(set(deployed_sc_names))
if not all_free_sc:
error_message = "No subcloud found in the configuration that is not deployed."
get_logger().log_exception(error_message)
raise ValueError(error_message)
return None
return all_free_sc.pop()

View File

@@ -1,6 +1,9 @@
import time
from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_delete_keywords import DcManagerSubcloudDeleteKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_deploy_keywords import DCManagerSubcloudDeployKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
@@ -8,6 +11,39 @@ from keywords.cloud_platform.dcmanager.dcmanager_subcloud_manager_keywords impor
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
def get_undeployed_subcloud_name(ssh_connection: SSHConnection) -> str:
"""Function get undeployed subcloud name
function to get undeployed subcloud name if no undeployed
subcloud found then delete the deployed one and return
Args:
ssh_connection (SSHConnection): SSH connection object.
Returns:
str: subcloudname
"""
dcm_sc_list_kw = DcManagerSubcloudListKeywords(ssh_connection)
subcloud_name = dcm_sc_list_kw.get_dcmanager_subcloud_list().get_undeployed_subcloud_name()
if subcloud_name is None:
get_logger().log_info("No Undeployed Subcloud found deleting existing one")
# Gets the lowest subcloud (the subcloud with the lowest id).
get_logger().log_info("Obtaining subcloud with the lowest ID.")
lowest_subcloud = dcm_sc_list_kw.get_dcmanager_subcloud_list().get_healthy_subcloud_with_lowest_id()
get_logger().log_info(f"Subcloud with the lowest ID obtained: ID={lowest_subcloud.get_id()}, Name={lowest_subcloud.get_name()}, Management state={lowest_subcloud.get_management()}")
subcloud_name = lowest_subcloud.get_name()
dcm_sc_manager_kw = DcManagerSubcloudManagerKeywords(ssh_connection)
# poweroff the subcloud.
get_logger().log_test_case_step(f"Poweroff subcloud={subcloud_name}.")
dcm_sc_manager_kw.set_subcloud_poweroff(subcloud_name)
# delete the subcloud.
dcm_sc_del_kw = DcManagerSubcloudDeleteKeywords(ssh_connection)
dcm_sc_del_kw.dcmanager_subcloud_delete(subcloud_name)
return subcloud_name
@mark.p0
@mark.lab_has_min_2_subclouds
def test_phased_deployment(request):
@@ -16,17 +52,18 @@ def test_phased_deployment(request):
Test Steps:
- log onto system controller
- get the ssh connection to the active controller
- check first to see if there is an undeployed subcloud if yes run test
- If we don't have a subcloud that is undeployed, remove it and then run the test.
- execute the phased deployment create
- execute the phased deployment install
- execute the phased deployment bootstrap
- execute the phased deployment config
- execute the phased deployment complete
- delete the subcloud
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
dcm_sc_list_kw = DcManagerSubcloudListKeywords(ssh_connection)
subcloud_name = dcm_sc_list_kw.get_dcmanager_subcloud_list().get_undeployed_subcloud_name(None)
subcloud_name = get_undeployed_subcloud_name(ssh_connection)
dcm_sc_deploy_kw = DCManagerSubcloudDeployKeywords(ssh_connection)
# dcmanager subcloud deploy create
get_logger().log_info(f"dcmanager subcloud deploy create {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_create(subcloud_name)
@@ -43,14 +80,46 @@ def test_phased_deployment(request):
get_logger().log_info(f"dcmanager subcloud deploy config {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_config(subcloud_name)
def teardown():
get_logger().log_info("Phased Deployment Teardown")
dcm_sc_manager_kw = DcManagerSubcloudManagerKeywords(ssh_connection)
# poweroff the subcloud.
get_logger().log_test_case_step(f"Poweroff subcloud={subcloud_name}.")
dcm_sc_manager_kw.set_subcloud_poweroff(subcloud_name)
# delete the subcloud.
dcm_sc_del_kw = DcManagerSubcloudDeleteKeywords(ssh_connection)
dcm_sc_del_kw.dcmanager_subcloud_delete(subcloud_name)
request.addfinalizer(teardown)
@mark.p0
@mark.lab_has_min_2_subclouds
def test_subcloud_deploy_abort_resume(request):
"""Test Phased deployment steps for subcloud deployment.
Test Steps:
- log onto system controller
- get the ssh connection to the active controller
- check first to see if there is an undeployed subcloud if yes run test
- If we don't have a subcloud that is undeployed, remove it and then run the test.
- execute the phased deployment create
- execute the phased deployment install
- execute the phased deployment bootstrap
- execute the phased deployment abort in some between 10 to 15 mins
- execute the phased deployment resume
"""
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
subcloud_name = get_undeployed_subcloud_name(ssh_connection)
dcm_sc_deploy_kw = DCManagerSubcloudDeployKeywords(ssh_connection)
# dcmanager subcloud deploy create
get_logger().log_info(f"dcmanager subcloud deploy create {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_create(subcloud_name)
# dcmanager subcloud deploy install
get_logger().log_info(f"dcmanager subcloud deploy install {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_install(subcloud_name)
# dcmanager subcloud deploy bootstrap
get_logger().log_info(f"dcmanager subcloud deploy bootstrap {subcloud_name}.")
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_bootstrap(subcloud_name, wait_for_status=False)
# sleep 10 mins for bootstrap to proccess
# TODO: Find a better way than random sleep , EX tail log for x number of line
sleep_time = 600
get_logger().log_info(f"Sleeping for {sleep_time/60} Minutes.")
time.sleep(sleep_time)
# dcmanager subcloud deploy bootstrap
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_abort(subcloud_name)
get_logger().log_info(f"dcmanager subcloud deploy abort {subcloud_name}.")
dcm_sc_list_kw = DcManagerSubcloudListKeywords(ssh_connection)
dcm_sc_list_kw.validate_subcloud_status(subcloud_name, "bootstrap-aborted")
# dcmanager subcloud deploy resume
dcm_sc_deploy_kw.dcmanager_subcloud_deploy_resume(subcloud_name)