Fix subcloud kube-root-ca certificate in-sync after rehome

After a subcloud rehome operation, the kube-root-ca certificate
was left out of sync. This change ensures that the certificate
is properly synchronized post-rehome and the subcloud is reported
as in-sync.

Test Plan:
PASS: Rehome a subcloud and verify kube-root-ca certificate is in-sync
PASS: Verify rehomed subcloud is in-sync, managed, and available

Change-Id: I1d951afae2b8309cb0a127c7eefc40e6f728ea35
Signed-off-by: aabhinav <ayyapasetti.abhinav@windriver.com>
This commit is contained in:
aabhinav
2025-10-03 13:46:04 -04:00
parent 7a3501b980
commit e822412851
5 changed files with 383 additions and 11 deletions

View File

@@ -0,0 +1,126 @@
from typing import Optional
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals_with_retry
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.cloud_platform.dcmanager.objects.dcmanager_kube_rootca_update_strategy_show_output import DcmanagerKubeRootcaUpdateStrategyShowOutput
class DcmanagerKubeRootcaUpdateStrategyKeywords(BaseKeyword):
"""
This class executes kube-rootca-update-strategy commands
"""
def __init__(self, ssh_connection: SSHConnection) -> None:
"""
Constructor
Args:
ssh_connection (SSHConnection): The SSH connection object used for executing commands.
"""
self.ssh_connection = ssh_connection
def wait_kube_upgrade(self, expected_status: str, check_interval: int = 30, timeout: int = 240) -> None:
"""
Waits for kube upgrade method to return True.
"""
def check_kube_deployment() -> str:
"""
Checks if the kube upgrade operation has been completed, either 'initial' or 'apply'.
Returns:
str: Expected status for kube deployment.
"""
kube_deployment_status = self.get_dcmanager_kube_rootca_update_strategy_step_show().get_dcmanager_kube_rootca_update_strategy_step_show().get_state()
return kube_deployment_status
validate_equals_with_retry(
function_to_execute=check_kube_deployment,
expected_value=expected_status,
validation_description=f"Waiting for sw_deployment_status {expected_status}.",
timeout=timeout,
polling_sleep_time=check_interval,
)
def dcmanager_kube_rootca_update_strategy_create(self, subcloud_apply_type: Optional[str] = None, max_parallel_subclouds: Optional[str] = None, stop_on_failure: Optional[bool] = None, group: Optional[str] = None, subject: Optional[str] = None, expiry_date: Optional[str] = None, cert_file: Optional[str] = None, force: Optional[bool] = None) -> DcmanagerKubeRootcaUpdateStrategyShowOutput:
"""
Create kube-rootca-update-strategy
Args:
subcloud_apply_type (Optional[str]): Subcloud apply type.
max_parallel_subclouds (Optional[str]): Maximum parallel subclouds.
stop_on_failure (Optional[bool]): Stop on failure.
group (Optional[str]): Group.
subject (Optional[str]): Subject.
expiry_date (Optional[str]): Expiry date.
cert_file (Optional[str]): Certificate file path.
force (Optional[bool]): Force flag.
Returns:
DcmanagerKubeRootcaUpdateStrategyShowOutput: An object containing details of the kubernetes strategy .
"""
cmd = "dcmanager kube-rootca-update-strategy create"
if subcloud_apply_type:
cmd += f" --subcloud-apply-type {subcloud_apply_type}"
if max_parallel_subclouds:
cmd += f" --max-parallel-subclouds {max_parallel_subclouds}"
if stop_on_failure:
cmd += " --stop-on-failure"
if group:
cmd += f" --group {group}"
if subject:
cmd += f" --subject {subject}"
if expiry_date:
cmd += f" --expiry-date {expiry_date}"
if cert_file:
cmd += f" --cert-file {cert_file}"
if force:
cmd += " --force"
output = self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
self.wait_kube_upgrade(expected_status="initial")
return DcmanagerKubeRootcaUpdateStrategyShowOutput(output)
def dcmanager_kube_rootca_update_strategy_apply(self) -> DcmanagerKubeRootcaUpdateStrategyShowOutput:
"""
Apply kube-rootca-update-strategy
Returns:
DcmanagerKubeRootcaUpdateStrategyShowOutput: An object containing details of the kubernetes strategy .
"""
cmd = "dcmanager kube-rootca-update-strategy apply"
output = self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
self.wait_kube_upgrade(expected_status="complete", check_interval=60, timeout=1800)
return DcmanagerKubeRootcaUpdateStrategyShowOutput(output)
def get_dcmanager_kube_rootca_update_strategy_step_show(self) -> DcmanagerKubeRootcaUpdateStrategyShowOutput:
"""
Gets the dcmanager kube-rootca-update-strategy show
Returns:
DcmanagerKubeRootcaUpdateStrategyShowOutput: An object containing details of the strategy step.
"""
cmd = "dcmanager kube-rootca-update-strategy show"
output = self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
return DcmanagerKubeRootcaUpdateStrategyShowOutput(output)
def dcmanager_kube_rootca_update_strategy_delete(self) -> DcmanagerKubeRootcaUpdateStrategyShowOutput:
"""
Delete kube-rootca-update-strategy
Returns:
DcmanagerKubeRootcaUpdateStrategyShowOutput: An object containing details of the kubernetes strategy.
"""
cmd = "dcmanager kube-rootca-update-strategy delete"
output = self.ssh_connection.send(source_openrc(cmd))
self.validate_success_return_code(self.ssh_connection)
return DcmanagerKubeRootcaUpdateStrategyShowOutput(output)

View File

@@ -0,0 +1,140 @@
class DcmanagerKubeRootcaUpdateStrategyShowObject:
"""
This class represents a dcmanager kube-rootca-update-strategy as an object.
"""
def __init__(self) -> None:
"""Initializes a DcmanagerKubeRootcaUpdateStrategyShowObject with default values."""
self.strategy_type: str = None
self.subcloud_apply_type: str = None
self.max_parallel_subclouds: str = None
self.stop_on_failure: str = None
self.state: str = None
self.created_at: str = None
self.updated_at: str = None
def set_strategy_type(self, strategy_type: str):
"""
Sets the strategy type of the kube-rootca-update-strategy.
Args:
strategy_type (str): The strategy type to set.
"""
self.strategy_type = strategy_type
def get_strategy_type(self) -> str:
"""
Gets the strategy type of the kube-rootca-update-strategy.
Returns:
str: The strategy type.
"""
return self.strategy_type
def set_subcloud_apply_type(self, subcloud_apply_type: str):
"""
Sets the subcloud apply type of the kube-rootca-update-strategy.
Args:
subcloud_apply_type (str): The subcloud apply type to set.
"""
self.subcloud_apply_type = subcloud_apply_type
def get_subcloud_apply_type(self) -> str:
"""
Gets the subcloud apply type of the kube-rootca-update-strategy.
Returns:
str: The subcloud apply type.
"""
return self.subcloud_apply_type
def set_max_parallel_subclouds(self, max_parallel_subclouds: str):
"""
Sets the max parallel subclouds of the kube-rootca-update-strategy.
Args:
max_parallel_subclouds (str): The max parallel subclouds to set.
"""
self.max_parallel_subclouds = max_parallel_subclouds
def get_max_parallel_subclouds(self) -> str:
"""
Gets the max parallel subclouds of the kube-rootca-update-strategy.
Returns:
str: The max parallel subclouds.
"""
return self.max_parallel_subclouds
def set_stop_on_failure(self, stop_on_failure: str):
"""
Sets the stop on failure of the kube-rootca-update-strategy.
Args:
stop_on_failure (str): The stop on failure to set.
"""
self.stop_on_failure = stop_on_failure
def get_stop_on_failure(self) -> str:
"""
Gets the stop on failure of the kube-rootca-update-strategy.
Returns:
str: The stop on failure.
"""
return self.stop_on_failure
def set_state(self, state: str):
"""
Sets the state of the kube-rootca-update-strategy.
Args:
state (str): The state to set.
"""
self.state = state
def get_state(self) -> str:
"""
Gets the state of the kube-rootca-update-strategy.
Returns:
str: The state.
"""
return self.state
def set_created_at(self, created_at: str):
"""
Sets the creation timestamp of the kube-rootca-update-strategy.
Args:
created_at (str): The creation timestamp to set.
"""
self.created_at = created_at
def get_created_at(self) -> str:
"""
Gets the creation timestamp of the kube-rootca-update-strategy.
Returns:
str: The creation timestamp.
"""
return self.created_at
def set_updated_at(self, updated_at: str):
"""
Sets the updated timestamp of the kube-rootca-update-strategy.
Args:
updated_at (str): The updated timestamp to set.
"""
self.updated_at = updated_at
def get_updated_at(self) -> str:
"""
Gets the updated timestamp of the kube-rootca-update-strategy.
Returns:
str: The updated timestamp.
"""
return self.updated_at

View File

@@ -0,0 +1,59 @@
from typing import Dict
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from keywords.cloud_platform.dcmanager.dcmanager_vertical_table_parser import DcManagerVerticalTableParser
from keywords.cloud_platform.dcmanager.objects.dcmanager_kube_rootca_update_strategy_show_object import DcmanagerKubeRootcaUpdateStrategyShowObject
class DcmanagerKubeRootcaUpdateStrategyShowOutput:
"""
Parses the output of the 'dcmanager kube-rootca-update-strategy' command into a DcmanagerKubeRootcaUpdateStrategyShowObject instance.
"""
def __init__(self, kube_strategy: str) -> None:
"""
Initializes DcmanagerKubeRootcaUpdateStrategyShowObject.
Args:
kube_strategy (str): Output of the 'kube-rootca-update-strategy' command.
Raises:
KeywordException: If the output format is invalid.
"""
dc_vertical_table_parser = DcManagerVerticalTableParser(kube_strategy)
output_values = dc_vertical_table_parser.get_output_values_dict()
if self.is_valid_output(output_values):
self.dcmanager_kube_rootca_update_strategy_step = DcmanagerKubeRootcaUpdateStrategyShowObject()
self.dcmanager_kube_rootca_update_strategy_step.set_state(output_values["state"])
else:
raise KeywordException(f"The output line {output_values} was not valid")
def get_dcmanager_kube_rootca_update_strategy_step_show(self) -> DcmanagerKubeRootcaUpdateStrategyShowObject:
"""
Retrieves the parsed dcmanager kube-rootca-update-strategy show object.
Returns:
DcmanagerKubeRootcaUpdateStrategyShowObject: The parsed dcmanager kube-rootca-update-strategy show object.
"""
return self.dcmanager_kube_rootca_update_strategy_step
@staticmethod
def is_valid_output(value: Dict[str, str]) -> bool:
"""
Checks if the output contains all the required fields.
Args:
value (Dict[str, str]): The dictionary of output values.
Returns:
bool: True if all required fields are present, False otherwise.
"""
required_fields = ["strategy type", "subcloud apply type", "max parallel subclouds", "stop on failure", "state", "created_at", "updated_at"]
for field in required_fields:
if field not in value:
get_logger().log_error(f"{field} is not in the output value")
return False
return True

View File

@@ -1,10 +1,10 @@
import time
import math
import time
from framework.exceptions.keyword_exception import KeywordException
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_greater_than, validate_equals
from framework.validation.validation import validate_equals
from keywords.base_keyword import BaseKeyword
@@ -178,9 +178,8 @@ class FileKeywords(BaseKeyword):
grep_pattern (str): Pattern to be searched.
Returns:
matches (int): Number of matches found.
int: Number of matches found.
"""
matches = int(self.ssh_connection.send(f"tar -tf {file_path} | grep {grep_pattern} | wc -l")[0].strip("\n"))
return matches
@@ -210,6 +209,29 @@ class FileKeywords(BaseKeyword):
get_logger().log_error(f"Failed to check file existence at {path}: {e}")
raise KeywordException(f"Failed to check file existence at {path}: {e}")
def concatenate_files_with_sudo(self, file1_path: str, file2_path: str, output_path: str) -> bool:
"""
Concatenate two files and store the result in a specified location using sudo.
Args:
file1_path (str): Path to the first file.
file2_path (str): Path to the second file.
output_path (str): Path where the concatenated result should be stored.
Returns:
bool: True if concatenation is successful, False otherwise.
Raises:
KeywordException: If there is an error executing the command.
"""
try:
cmd = f"cat {file1_path} {file2_path} > {output_path}"
self.ssh_connection.send_as_sudo(cmd)
return self.validate_file_exists_with_sudo(output_path)
except Exception as e:
get_logger().log_error(f"Failed to concatenate files {file1_path} and {file2_path} to {output_path}: {e}")
raise KeywordException(f"Failed to concatenate files {file1_path} and {file2_path} to {output_path}: {e}")
def create_directory(self, dir_path: str) -> bool:
"""
Create a directory if it does not already exist (non-sudo).
@@ -362,14 +384,14 @@ class FileKeywords(BaseKeyword):
"""
self.ssh_connection.send(f"cp {src_file} {dest_file}")
def create_file_to_fill_disk_space(self, dest_dir: str = "/home/sysadmin"):
def create_file_to_fill_disk_space(self, dest_dir: str = "/home/sysadmin") -> str:
"""Creates a file with the available space of the desired directory.
Args:
dest_dir (str): Directory where the file is created. Default to home dir.
Returns:
path_to_file (str): Created file path.
str: Created file path.
"""
available_space = self.ssh_connection.send(f"echo $(($(stat -f --format=\"%a*%S\" {dest_dir})))| awk '{{print $1 / (1024*1024*1024) }}'")[0].strip("\n")
rounded_size = math.ceil(float(available_space))

View File

@@ -4,6 +4,7 @@ from config.configuration_manager import ConfigurationManager
from framework.logging.automation_logger import get_logger
from framework.ssh.ssh_connection import SSHConnection
from framework.validation.validation import validate_equals
from keywords.cloud_platform.dcmanager.dcmanager_kube_rootca_update_strategy_keywords import DcmanagerKubeRootcaUpdateStrategyKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_add_keywords import DcManagerSubcloudAddKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_delete_keywords import DcManagerSubcloudDeleteKeywords
from keywords.cloud_platform.dcmanager.dcmanager_subcloud_list_keywords import DcManagerSubcloudListKeywords
@@ -17,7 +18,7 @@ from keywords.cloud_platform.version_info.cloud_platform_version_manager import
from keywords.files.file_keywords import FileKeywords
def verify_software_release(ssh_connection: SSHConnection):
def verify_software_release(ssh_connection: SSHConnection) -> None:
"""
Verify that the software release image is available on the given system controller.
@@ -34,7 +35,7 @@ def verify_software_release(ssh_connection: SSHConnection):
validate_equals(is_sig_exist, True, f"Sig file exists in path {path}.")
def update_subcloud_assets(ssh_connection: SSHConnection, subcloud_bootstrap_values: str, subcloud_install_values: str, systemcontroller_gateway_address: str):
def update_subcloud_assets(ssh_connection: SSHConnection, subcloud_bootstrap_values: str, subcloud_install_values: str, systemcontroller_gateway_address: str) -> None:
"""
Update the subcloud assets files before rehome.
@@ -53,7 +54,7 @@ def update_subcloud_assets(ssh_connection: SSHConnection, subcloud_bootstrap_val
yaml_file.upload_file(file, subcloud_install_values)
def sync_deployment_assets_between_system_controllers(origin_ssh_connection: SSHConnection, destination_ssh_connection: SSHConnection, subcloud_name: str, subcloud_bootstrap_values: str, subcloud_install_values: str):
def sync_deployment_assets_between_system_controllers(origin_ssh_connection: SSHConnection, destination_ssh_connection: SSHConnection, subcloud_name: str, subcloud_bootstrap_values: str, subcloud_install_values: str) -> None:
"""
Synchronize deployment assets files for a given subcloud between two system controllers.
@@ -71,7 +72,29 @@ def sync_deployment_assets_between_system_controllers(origin_ssh_connection: SSH
update_subcloud_assets(destination_ssh_connection, subcloud_bootstrap_values, subcloud_install_values, systemcontroller_gateway_address)
def perform_rehome_operation(origin_ssh_connection: SSHConnection, destination_ssh_connection: SSHConnection, subcloud_name: str, subcloud_bootstrap_values: str, subcloud_install_values: str):
def get_subcloud_in_sync(ssh_connection: SSHConnection, subcloud_name: str) -> None:
"""
Ensure that the specified subcloud reaches 'in-sync' status.
Args:
ssh_connection (SSHConnection): SSH connection to the target system controller.
subcloud_name (str): Name of the subcloud to make it in-sync.
"""
file_keywords = FileKeywords(ssh_connection)
path1 = "/etc/kubernetes/pki/ca.crt"
path2 = "/etc/kubernetes/pki/ca.key"
is_crt_exist = file_keywords.validate_file_exists_with_sudo(path1)
validate_equals(is_crt_exist, True, f"Crt file exists in path {path1}.")
is_key_exist = file_keywords.validate_file_exists_with_sudo(path2)
validate_equals(is_key_exist, True, f"Key file exists in path {path2}.")
file_keywords.concatenate_files_with_sudo(path1, path2, "/tmp/ca.pem")
dcm_krc_update_strategy = DcmanagerKubeRootcaUpdateStrategyKeywords(ssh_connection)
dcm_krc_update_strategy.dcmanager_kube_rootca_update_strategy_create(cert_file="/tmp/ca.pem")
dcm_krc_update_strategy.dcmanager_kube_rootca_update_strategy_apply()
DcManagerSubcloudListKeywords(ssh_connection).validate_subcloud_sync_status(subcloud_name, expected_sync_status="in-sync")
def perform_rehome_operation(origin_ssh_connection: SSHConnection, destination_ssh_connection: SSHConnection, subcloud_name: str, subcloud_bootstrap_values: str, subcloud_install_values: str) -> None:
"""
Rehome a subcloud from the origin system controller to the destination system controller.
@@ -82,7 +105,6 @@ def perform_rehome_operation(origin_ssh_connection: SSHConnection, destination_s
subcloud_bootstrap_values (str): Path to the subcloud bootstrap values file.
subcloud_install_values (str): Path to the subcloud install values file.
"""
# Ensure software image load is available on destination system controller.
verify_software_release(destination_ssh_connection)
@@ -103,6 +125,9 @@ def perform_rehome_operation(origin_ssh_connection: SSHConnection, destination_s
get_logger().log_info(f"Deleting subcloud from {origin_ssh_connection}")
DcManagerSubcloudDeleteKeywords(origin_ssh_connection).dcmanager_subcloud_delete(subcloud_name)
get_logger().log_info(f"Getting subcloud {subcloud_name} in-sync on {destination_ssh_connection}")
get_subcloud_in_sync(destination_ssh_connection, subcloud_name)
@mark.p2
@mark.lab_has_subcloud