diff --git a/pytest_fixtures/component/satellite_auth.py b/pytest_fixtures/component/satellite_auth.py index 465f85147a7..1c3c5bd739f 100644 --- a/pytest_fixtures/component/satellite_auth.py +++ b/pytest_fixtures/component/satellite_auth.py @@ -14,7 +14,8 @@ LDAP_ATTR, LDAP_SERVER_TYPE, ) -from robottelo.hosts import IPAHost, SSOHost +from robottelo.hosts import IPAHost, RHBKHost, RHSSOHost +from robottelo.logging import logger from robottelo.utils.datafactory import gen_string from robottelo.utils.installer import InstallerCommand @@ -24,9 +25,11 @@ @pytest.fixture(scope='module') def default_sso_host(request, module_target_sat): """Returns default sso host""" - if hasattr(request, 'param'): - return SSOHost(module_target_sat, rhbk=request.param) - return SSOHost(module_target_sat) + if hasattr(request, 'param') and request.param: + logger.info("Using RHBK host for SSO") + return RHBKHost(module_target_sat) + logger.info("Using RHSSO host for SSO") + return RHSSOHost(module_target_sat) @pytest.fixture(scope='module') @@ -346,7 +349,7 @@ def enable_external_auth_rhsso( enroll_configure_rhsso_external_auth, default_sso_host, module_target_sat ): """register the satellite with RH-SSO Server for single sign-on""" - client_id = default_sso_host.get_rhsso_client_id() + client_id = default_sso_host.get_sso_client_id() default_sso_host.create_mapper(GROUP_MEMBERSHIP_MAPPER, client_id) audience_mapper = copy.deepcopy(AUDIENCE_MAPPER) audience_mapper['config']['included.client.audience'] = audience_mapper['config'][ diff --git a/robottelo/config/validators.py b/robottelo/config/validators.py index 47b13ba41a0..c2999dde8ff 100644 --- a/robottelo/config/validators.py +++ b/robottelo/config/validators.py @@ -304,6 +304,18 @@ must_exist=True, ), ], + rhbk=[ + Validator( + 'rhbk.host_name', + 'rhbk.host_port', + 'rhbk.host_url', + 'rhbk.rhbk_user', + 'rhbk.rhbk_password', + 'rhbk.realm', + 'rhbk.totp_secret', + must_exist=True, + ), + ], remotedb=[ Validator( 'remotedb.server', diff --git a/robottelo/hosts.py b/robottelo/hosts.py index f42f2d49065..ce2ec05e1d8 100644 --- a/robottelo/hosts.py +++ b/robottelo/hosts.py @@ -2421,40 +2421,19 @@ def run_orphan_cleanup(self, smart_proxy_id=None): class SSOHost(Host): - """Class for RHSSO functions and setup""" + """Class for SSO functions and setup""" - def __init__(self, sat_obj, rhbk=False, **kwargs): + def __init__(self, sat_obj, **kwargs): self.satellite = sat_obj - if rhbk: - self.rhbk = True - self.host_url = settings.rhbk.host_url - self.host_name = settings.rhbk.host_name - self.host_port = settings.rhbk.host_port - self.realm = settings.rhbk.realm - self.user = settings.rhbk.rhbk_user - self.password = settings.rhbk.rhbk_password - self.kcadm = RHBK_CLI - kwargs['hostname'] = kwargs.get('hostname', settings.rhbk.host_name) - else: - self.rhbk = False - self.host_url = settings.rhsso.host_url - self.host_name = settings.rhsso.host_name - self.host_port = 443 - self.realm = settings.rhsso.realm - self.user = settings.rhsso.rhsso_user - self.password = settings.rhsso.rhsso_password - self.kcadm = KEY_CLOAK_CLI - kwargs['hostname'] = kwargs.get('hostname', settings.rhsso.host_name) kwargs['ipv6'] = kwargs.get('ipv6', settings.server.is_ipv6) super().__init__(**kwargs) - def get_rhsso_client_id(self): + def get_sso_client_id(self): """getter method for fetching the client id and can be used other functions""" client_name = f'{self.satellite.hostname}-foreman-openidc' - uri = self.host_url if self.rhbk else self.host_url.replace("https://", "http://") self.execute( f'{self.kcadm} config credentials ' - f'--server {uri}/auth ' + f'--server {self.uri}/auth ' f'--realm {self.realm} ' f'--user {self.user} ' f'--password {self.password}' @@ -2470,21 +2449,21 @@ def get_rhsso_client_id(self): return client_id @lru_cache - def get_rhsso_user_details(self, username): + def get_sso_user_details(self, username): """Getter method to receive the user id""" result = self.execute(f"{self.kcadm} get users -r {self.realm} -q username={username}") result_json = json.loads(result.stdout) return result_json[0] @lru_cache - def get_rhsso_groups_details(self, group_name): + def get_sso_groups_details(self, group_name): """Getter method to receive the group id""" result = self.execute(f"{self.kcadm} get groups -r {self.realm}") group_list = json.loads(result.stdout) query_group = [group for group in group_list if group['name'] == group_name] return query_group[0] - def upload_rhsso_entity(self, json_content, entity_name): + def upload_sso_entity(self, json_content, entity_name): """Helper method to upload the RHSSO entity file on RHSSO Server. Overwrites already existing file with the same name. """ @@ -2499,13 +2478,13 @@ def upload_rhsso_entity(self, json_content, entity_name): def create_mapper(self, json_content, client_id): """Helper method to create the RH-SSO Client Mapper""" - self.upload_rhsso_entity(json_content, "mapper_file") + self.upload_sso_entity(json_content, "mapper_file") self.execute( f'{self.kcadm} create clients/{client_id}/protocol-mappers/models -r ' f'{self.realm} -f {"mapper_file"}' ) - def create_new_rhsso_user(self, username=None): + def create_new_sso_user(self, username=None): """create new user in RHSSO instance and set the password""" update_data_user = Box(RHSSO_NEW_USER) update_data_pass = Box(RHSSO_RESET_PASSWORD) @@ -2514,32 +2493,32 @@ def create_new_rhsso_user(self, username=None): update_data_user.username = username update_data_user.email = username + random.choice(valid_emails_list()) update_data_pass.value = self.password - self.upload_rhsso_entity(update_data_user, "create_user") - self.upload_rhsso_entity(update_data_pass, "reset_password") + self.upload_sso_entity(update_data_user, "create_user") + self.upload_sso_entity(update_data_pass, "reset_password") self.execute(f"{self.kcadm} create users -r {self.realm} -f create_user") - user_details = self.get_rhsso_user_details(update_data_user.username) + user_details = self.get_sso_user_details(update_data_user.username) self.execute( f'{self.kcadm} update -r {self.realm} ' f'users/{user_details["id"]}/reset-password -f {"reset_password"}' ) return update_data_user - def update_rhsso_user(self, username, group_name=None): + def update_sso_user(self, username, group_name=None): update_data_user = Box(RHSSO_USER_UPDATE) - user_details = self.get_rhsso_user_details(username) + user_details = self.get_sso_user_details(username) update_data_user.realm = self.realm update_data_user.userId = f"{user_details['id']}" if group_name: - group_details = self.get_rhsso_groups_details(group_name=group_name) + group_details = self.get_sso_groups_details(group_name=group_name) update_data_user['groupId'] = f"{group_details['id']}" - self.upload_rhsso_entity(update_data_user, "update_user") + self.upload_sso_entity(update_data_user, "update_user") group_path = f"users/{user_details['id']}/groups/{group_details['id']}" self.execute(f"{self.kcadm} update -r {self.realm} {group_path} -f update_user") - def delete_rhsso_user(self, username): + def delete_sso_user(self, username): """Delete the RHSSO user""" - user_details = self.get_rhsso_user_details(username) - self.execute(f"{self.kcadm} delete -r {settings.rhsso.realm} users/{user_details['id']}") + user_details = self.get_sso_user_details(username) + self.execute(f"{self.kcadm} delete -r {settings.sso.realm} users/{user_details['id']}") def create_group(self, group_name=None): """Create the RHSSO group""" @@ -2547,21 +2526,19 @@ def create_group(self, group_name=None): if not group_name: group_name = gen_string('alphanumeric') update_user_group.name = group_name - self.upload_rhsso_entity(update_user_group, "create_group") - result = self.execute( - f"{self.kcadm} create groups -r {settings.rhsso.realm} -f create_group" - ) + self.upload_sso_entity(update_user_group, "create_group") + result = self.execute(f"{self.kcadm} create groups -r {settings.sso.realm} -f create_group") return result.stdout - def delete_rhsso_group(self, group_name): + def delete_sso_group(self, group_name): """Delete the RHSSO group""" - group_details = self.get_rhsso_groups_details(group_name) - self.execute(f"{self.kcadm} delete -r {settings.rhsso.realm} groups/{group_details['id']}") + group_details = self.get_sso_groups_details(group_name) + self.execute(f"{self.kcadm} delete -r {settings.sso.realm} groups/{group_details['id']}") def update_client_configuration(self, json_content): """Update the client configuration""" - client_id = self.get_rhsso_client_id() - self.upload_rhsso_entity(json_content, "update_client_info") + client_id = self.get_sso_client_id() + self.upload_sso_entity(json_content, "update_client_info") update_cmd = ( f"{self.kcadm} update clients/{client_id} " # EOL space important "-f update_client_info -s enabled=true --merge" @@ -2605,6 +2582,40 @@ def set_the_redirect_uri(self): self.update_client_configuration(client_config) +class RHBKHost(SSOHost): + """Class for RHBK functions and setup""" + + def __init__(self, sat_obj, **kwargs): + self.rhbk = True + self.host_url = settings.rhbk.host_url + self.uri = self.host_url + self.host_name = settings.rhbk.host_name + self.host_port = settings.rhbk.host_port + self.realm = settings.rhbk.realm + self.user = settings.rhbk.rhbk_user + self.password = settings.rhbk.rhbk_password + self.kcadm = RHBK_CLI + kwargs['hostname'] = kwargs.get('hostname', settings.rhbk.host_name) + super().__init__(sat_obj, **kwargs) + + +class RHSSOHost(SSOHost): + """Class for RHSSO functions and setup""" + + def __init__(self, sat_obj, **kwargs): + self.rhbk = False + self.host_url = settings.rhsso.host_url + self.uri = self.host_url.replace("https://", "http://") + self.host_name = settings.rhsso.host_name + self.host_port = 443 + self.realm = settings.rhsso.realm + self.user = settings.rhsso.rhsso_user + self.password = settings.rhsso.rhsso_password + self.kcadm = KEY_CLOAK_CLI + kwargs['hostname'] = kwargs.get('hostname', settings.rhsso.host_name) + super().__init__(sat_obj, **kwargs) + + class IPAHost(Host): def __init__(self, sat_obj, **kwargs): self.satellite = sat_obj diff --git a/tests/foreman/api/test_templatesync.py b/tests/foreman/api/test_templatesync.py index c9d86bf17c7..5dc60d9806d 100644 --- a/tests/foreman/api/test_templatesync.py +++ b/tests/foreman/api/test_templatesync.py @@ -28,7 +28,6 @@ FOREMAN_TEMPLATES_NOT_IMPORTED_COUNT, ) from robottelo.logging import logger -from robottelo.utils.issue_handlers import is_open git = settings.git @@ -1066,29 +1065,58 @@ def test_positive_export_log_to_production( @pytest.mark.tier2 @pytest.mark.skip_if_not_set('git') @pytest.mark.parametrize( - 'url', + ('url', 'git_repository', 'use_proxy', 'setup_http_proxy_without_global_settings'), [ - f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', - f'ssh://git@{git.hostname}:{git.ssh_port}', + ( + f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', + True, + True, + True, + ), + ( + f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', + True, + True, + False, + ), + ( + f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', + True, + False, + True, + ), + ( + f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', + False, + True, + True, + ), + ( + f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', + False, + False, + True, + ), + ( + f'http://{git.username}:{git.password}@{git.hostname}:{git.http_port}', + False, + True, + False, + ), + (f'ssh://git@{git.hostname}:{git.ssh_port}', True, False, True), + (f'ssh://git@{git.hostname}:{git.ssh_port}', False, False, True), ], - ids=['http', 'ssh'], - ) - @pytest.mark.parametrize( - 'git_repository', - [True, False], - indirect=True, - ids=['non_empty_repo', 'empty_repo'], - ) - @pytest.mark.parametrize( - 'use_proxy', - [True, False], - ids=['use_proxy', 'do_not_use_proxy'], - ) - @pytest.mark.parametrize( - 'setup_http_proxy_without_global_settings', - [True, False], - indirect=True, - ids=['auth_http_proxy', 'unauth_http_proxy'], + ids=[ + 'auth_http_proxy-use_proxy-non_empty_repo-http', + 'unauth_http_proxy-use_proxy-non_empty_repo-http', + 'auth_http_proxy-do_not_use_proxy-non_empty_repo-http', + 'auth_http_proxy-use_proxy-empty_repo-http', + 'auth_http_proxy-do_not_use_proxy-empty_repo-http', + 'unauth_http_proxy-use_proxy-empty_repo-http', + 'auth_http_proxy-do_not_use_proxy-non_empty_repo-ssh', + 'auth_http_proxy-do_not_use_proxy-empty_repo-ssh', + ], + indirect=['git_repository', 'setup_http_proxy_without_global_settings'], ) def test_positive_export_all_templates_to_repo( self, @@ -1119,15 +1147,7 @@ def test_positive_export_all_templates_to_repo( :CaseImportance: Low """ - # TODO remove this - if is_open('SAT-28933') and 'ssh' in url: - pytest.skip("Temporary skip of SSH tests") - proxy, param = setup_http_proxy_without_global_settings - if not use_proxy and not param: - # only do-not-use one kind of proxy - pytest.skip( - "Invalid parameter combination. DO NOT USE PROXY scenario should only be tested once." - ) + proxy, _ = setup_http_proxy_without_global_settings try: data = { 'repo': f'{url}/{git.username}/{git_repository["name"]}', diff --git a/tests/foreman/cli/test_templatesync.py b/tests/foreman/cli/test_templatesync.py index 1b67bf60c81..aea909482dd 100644 --- a/tests/foreman/cli/test_templatesync.py +++ b/tests/foreman/cli/test_templatesync.py @@ -21,7 +21,6 @@ FOREMAN_TEMPLATE_IMPORT_URL, FOREMAN_TEMPLATE_TEST_TEMPLATE, ) -from robottelo.utils.issue_handlers import is_open git = settings.git @@ -106,24 +105,34 @@ def test_positive_import_force_locked_template( pytest.fail('The template is not imported for force test') @pytest.mark.skip_if_not_set('git') + # TODO: add Github ssh key setup for ssh version of this test @pytest.mark.parametrize( - 'url', + ('url', 'use_proxy_global', 'setup_http_proxy_global'), [ - 'https://github.com/theforeman/community-templates.git', - 'ssh://git@github.com/theforeman/community-templates.git', + ( + 'https://github.com/theforeman/community-templates.git', + True, + True, + ), + ( + 'https://github.com/theforeman/community-templates.git', + True, + False, + ), + ( + 'https://github.com/theforeman/community-templates.git', + False, + True, + ), + ], + ids=[ + 'use_proxy_global-auth_http_proxy_global-http', + 'use_proxy_global-unauth_http_proxy_global-http', + 'do_not_use_proxy_global-auth_http_proxy_global-http', + ], + indirect=[ + 'setup_http_proxy_global', ], - ids=['http', 'ssh'], - ) - @pytest.mark.parametrize( - 'setup_http_proxy_global', - [True, False], - indirect=True, - ids=['auth_http_proxy_global', 'unauth_http_proxy_global'], - ) - @pytest.mark.parametrize( - 'use_proxy_global', - [True, False], - ids=['use_proxy_global', 'do_not_use_proxy_global'], ) @pytest.mark.tier2 def test_positive_import_dir_filtered( @@ -150,15 +159,7 @@ def test_positive_import_dir_filtered( :CaseImportance: Medium """ - # TODO remove this - if is_open('SAT-28933') and 'ssh' in url: - pytest.skip("Temporary skip of SSH tests") - proxy, param = setup_http_proxy_global - if not use_proxy_global and not param: - # only do-not-use one kind of proxy - pytest.skip( - "Invalid parameter combination. DO NOT USE PROXY scenario should only be tested once." - ) + proxy, _ = setup_http_proxy_global pt_name = 'FreeBSD default fake' if target_sat.cli.PartitionTable.list({'search': f'name=\\"{pt_name}\\"'}): target_sat.cli.PartitionTable.update({'name': pt_name, 'locked': 0}) diff --git a/tests/foreman/destructive/test_ldap_authentication.py b/tests/foreman/destructive/test_ldap_authentication.py index fbef0327999..46dd5fe230d 100644 --- a/tests/foreman/destructive/test_ldap_authentication.py +++ b/tests/foreman/destructive/test_ldap_authentication.py @@ -113,7 +113,7 @@ def rhsso_groups_teardown(module_target_sat, default_sso_host): """Teardown the rhsso groups""" yield for group_name in ('sat_users', 'sat_admins'): - default_sso_host.delete_rhsso_group(group_name) + default_sso_host.delete_sso_group(group_name) @pytest.fixture @@ -365,8 +365,8 @@ def test_external_new_user_login_and_check_count_rhsso( :expectedresults: New User created in RHSSO server should able to get log-in and correct count shown for external users """ - client_id = default_sso_host.get_rhsso_client_id() - user_details = default_sso_host.create_new_rhsso_user(client_id) + client_id = default_sso_host.get_sso_client_id() + user_details = default_sso_host.create_new_sso_user(client_id) login_details = { 'username': user_details['username'], 'password': settings.rhsso.rhsso_password, @@ -379,7 +379,7 @@ def test_external_new_user_login_and_check_count_rhsso( updated_count = len([user for user in users if user.auth_source_name == 'External']) assert updated_count == external_user_count + 1 # checking delete user can't login anymore - default_sso_host.delete_rhsso_user(user_details['username']) + default_sso_host.delete_sso_user(user_details['username']) with module_target_sat.ui_session(login=False) as rhsso_session: with pytest.raises(NavigationTriesExceeded) as error: rhsso_session.rhsso_login.login(login_details) @@ -422,7 +422,7 @@ def test_login_failure_rhsso_user_if_internal_user_exist( login=username, password=settings.rhsso.rhsso_password, ).create() - external_rhsso_user = default_sso_host.create_new_rhsso_user(username=username) + external_rhsso_user = default_sso_host.create_new_sso_user(username=username) login_details = { 'username': external_rhsso_user['username'], 'password': settings.rhsso.rhsso_password, @@ -459,7 +459,7 @@ def test_user_permissions_rhsso_user_after_group_delete( :expectedresults: external rhsso user's permissions should get revoked after external rhsso group deletion. """ - default_sso_host.get_rhsso_client_id() + default_sso_host.get_sso_client_id() username = settings.rhsso.rhsso_user location_name = gen_string('alpha') login_details = { @@ -469,7 +469,7 @@ def test_user_permissions_rhsso_user_after_group_delete( group_name = gen_string('alpha') default_sso_host.create_group(group_name=group_name) - default_sso_host.update_rhsso_user(username, group_name=group_name) + default_sso_host.update_sso_user(username, group_name=group_name) # creating satellite external group user_group = module_target_sat.cli_factory.usergroup({'admin': 1, 'name': group_name}) @@ -491,7 +491,7 @@ def test_user_permissions_rhsso_user_after_group_delete( assert login_details['username'] in current_user # delete the rhsso group and verify the rhsso-user permissions - default_sso_host.delete_rhsso_group(group_name=group_name) + default_sso_host.delete_sso_group(group_name=group_name) with module_target_sat.ui_session(login=False) as rhsso_session: rhsso_session.rhsso_login.login(login_details) with pytest.raises(NavigationTriesExceeded) as error: @@ -527,7 +527,7 @@ def test_user_permissions_rhsso_user_multiple_group( :expectedresults: external rhsso user have highest level of permissions from among the multiple groups. """ - default_sso_host.get_rhsso_client_id() + default_sso_host.get_sso_client_id() username = settings.rhsso.rhsso_user location_name = gen_string('alpha') login_details = { @@ -548,7 +548,7 @@ def test_user_permissions_rhsso_user_multiple_group( for group_name, argument in zip(group_names, arguments, strict=True): # adding/creating rhsso groups default_sso_host.create_group(group_name=group_name) - default_sso_host.update_rhsso_user(username, group_name=group_name) + default_sso_host.update_sso_user(username, group_name=group_name) argument['name'] = group_name # creating satellite external groups diff --git a/tests/foreman/ui/test_templatesync.py b/tests/foreman/ui/test_templatesync.py index d55e43d51d4..70b6388439f 100644 --- a/tests/foreman/ui/test_templatesync.py +++ b/tests/foreman/ui/test_templatesync.py @@ -33,15 +33,20 @@ def templates_loc(templates_org, module_target_sat): @pytest.mark.skip_if_not_set('git') @pytest.mark.parametrize( - 'setup_http_proxy_without_global_settings', - [True, False], - indirect=True, - ids=['auth_http_proxy', 'unauth_http_proxy'], -) -@pytest.mark.parametrize( - 'use_proxy', - [True, False], - ids=['use_proxy', 'do_not_use_proxy'], + ('setup_http_proxy_without_global_settings', 'use_proxy'), + [ + (True, True), + (False, True), + (True, False), + ], + ids=[ + 'auth_http_proxy-use_proxy', + 'unauth_http_proxy-use_proxy', + 'auth_http_proxy-do_not_use_proxy', + ], + indirect=[ + 'setup_http_proxy_without_global_settings', + ], ) @pytest.mark.tier2 @pytest.mark.upgrade