|
|
|
@@ -17,6 +17,7 @@ import copy
|
|
|
|
|
import random
|
|
|
|
|
from unittest import mock
|
|
|
|
|
|
|
|
|
|
from octavia_lib.common import constants as lib_consts
|
|
|
|
|
from oslo_config import cfg
|
|
|
|
|
from oslo_config import fixture as oslo_fixture
|
|
|
|
|
from oslo_utils import uuidutils
|
|
|
|
@@ -1205,6 +1206,72 @@ class TestListener(base.BaseAPITest):
|
|
|
|
|
self.create_listener(constants.PROTOCOL_TCP, 80,
|
|
|
|
|
self.lb_id, allowed_cidrs=allowed_cidrs)
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_create_with_tls_versions(self, mock_cert_data):
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
tls_versions = constants.TLS_VERSIONS_OWASP_SUITE_B
|
|
|
|
|
listener = self.create_listener(constants.PROTOCOL_TERMINATED_HTTPS,
|
|
|
|
|
80, self.lb_id,
|
|
|
|
|
default_tls_container_ref=cert_id,
|
|
|
|
|
tls_versions=tls_versions)
|
|
|
|
|
listener_path = self.LISTENER_PATH.format(
|
|
|
|
|
listener_id=listener['listener']['id'])
|
|
|
|
|
get_listener = self.get(listener_path).json['listener']
|
|
|
|
|
self.assertEqual(tls_versions, get_listener['tls_versions'])
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_create_with_tls_versions_negative(self, mock_cert_data):
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
req_dict = {'protocol': constants.PROTOCOL_TERMINATED_HTTPS,
|
|
|
|
|
'protocol_port': 80,
|
|
|
|
|
'loadbalancer_id': self.lb_id,
|
|
|
|
|
'default_tls_container_ref': cert_id,
|
|
|
|
|
'tls_versions': [lib_consts.TLS_VERSION_1_3, 'insecure']}
|
|
|
|
|
res = self.post(self.LISTENERS_PATH, self._build_body(req_dict),
|
|
|
|
|
status=400)
|
|
|
|
|
fault = res.json['faultstring']
|
|
|
|
|
self.assertIn('Validation failure: Invalid TLS versions', fault)
|
|
|
|
|
self.assert_correct_status(lb_id=self.lb_id)
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_create_with_tls_ciphers(self, mock_cert_data):
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
tls_ciphers = constants.CIPHERS_OWASP_SUITE_B
|
|
|
|
|
listener = self.create_listener(constants.PROTOCOL_TERMINATED_HTTPS,
|
|
|
|
|
80, self.lb_id,
|
|
|
|
|
default_tls_container_ref=cert_id,
|
|
|
|
|
tls_ciphers=tls_ciphers)
|
|
|
|
|
listener_path = self.LISTENER_PATH.format(
|
|
|
|
|
listener_id=listener['listener']['id'])
|
|
|
|
|
get_listener = self.get(listener_path).json['listener']
|
|
|
|
|
self.assertEqual(tls_ciphers, get_listener['tls_ciphers'])
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_create_with_tls_ciphers_negative(self, mock_cert_data):
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
req_dict = {'protocol': constants.PROTOCOL_TERMINATED_HTTPS,
|
|
|
|
|
'protocol_port': 80,
|
|
|
|
|
'loadbalancer_id': self.lb_id,
|
|
|
|
|
'default_tls_container_ref': cert_id,
|
|
|
|
|
'tls_ciphers': ['cipher-insecure']}
|
|
|
|
|
res = self.post(self.LISTENERS_PATH, self._build_body(req_dict),
|
|
|
|
|
status=400)
|
|
|
|
|
fault = res.json['faultstring']
|
|
|
|
|
self.assertIn('Invalid input for field/attribute tls_ciphers', fault)
|
|
|
|
|
self.assert_correct_status(lb_id=self.lb_id)
|
|
|
|
|
|
|
|
|
|
def _test_negative_create_with_headers(self, protocol):
|
|
|
|
|
req_dict = {'name': 'listener1', 'default_pool_id': None,
|
|
|
|
|
'description': 'desc1',
|
|
|
|
@@ -1357,6 +1424,77 @@ class TestListener(base.BaseAPITest):
|
|
|
|
|
api_listener = response.json.get(self.root_tag)
|
|
|
|
|
self.assertEqual('listener2', api_listener['name'])
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_update_with_tls_versions(self, mock_cert_data):
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
tls_versions_orig = [lib_consts.TLS_VERSION_1_1]
|
|
|
|
|
tls_versions = [lib_consts.TLS_VERSION_1_2, lib_consts.TLS_VERSION_1_3]
|
|
|
|
|
listener = self.create_listener(
|
|
|
|
|
constants.PROTOCOL_TERMINATED_HTTPS, 80, self.lb_id,
|
|
|
|
|
default_tls_container_ref=cert_id,
|
|
|
|
|
tls_versions=tls_versions_orig)
|
|
|
|
|
self.set_lb_status(self.lb_id)
|
|
|
|
|
listener_path = self.LISTENER_PATH.format(
|
|
|
|
|
listener_id=listener['listener']['id'])
|
|
|
|
|
get_listener = self.get(listener_path).json['listener']
|
|
|
|
|
self.assertEqual(tls_versions_orig,
|
|
|
|
|
get_listener.get('tls_versions'))
|
|
|
|
|
self.put(listener_path,
|
|
|
|
|
self._build_body({'tls_versions': tls_versions}))
|
|
|
|
|
get_listener = self.get(listener_path).json['listener']
|
|
|
|
|
self.assertEqual(tls_versions, get_listener.get('tls_versions'))
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_update_with_tls_versions_negative(self, mock_cert_data):
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
tls_versions_orig = [lib_consts.TLS_VERSION_1_1]
|
|
|
|
|
listener = self.create_listener(
|
|
|
|
|
constants.PROTOCOL_TERMINATED_HTTPS, 80, self.lb_id,
|
|
|
|
|
default_tls_container_ref=cert_id,
|
|
|
|
|
tls_versions=tls_versions_orig)
|
|
|
|
|
self.set_lb_status(self.lb_id)
|
|
|
|
|
listener_path = self.LISTENER_PATH.format(
|
|
|
|
|
listener_id=listener['listener']['id'])
|
|
|
|
|
get_listener = self.get(listener_path).json['listener']
|
|
|
|
|
self.assertEqual(tls_versions_orig, get_listener.get('tls_versions'))
|
|
|
|
|
|
|
|
|
|
req_dict = {'tls_versions': [lib_consts.TLS_VERSION_1_3, 'insecure']}
|
|
|
|
|
res = self.put(listener_path, self._build_body(req_dict),
|
|
|
|
|
status=400)
|
|
|
|
|
fault = res.json['faultstring']
|
|
|
|
|
self.assertIn('Validation failure: Invalid TLS versions:', fault)
|
|
|
|
|
self.assert_correct_status(lb_id=self.lb_id)
|
|
|
|
|
|
|
|
|
|
# TODO(johnsom) Fix this when there is a noop certificate manager
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_update_with_tls_ciphers_negative(self, mock_cert_data):
|
|
|
|
|
cert_id = uuidutils.generate_uuid()
|
|
|
|
|
cert1 = data_models.TLSContainer(certificate='cert 1')
|
|
|
|
|
mock_cert_data.return_value = {'tls_cert': cert1}
|
|
|
|
|
tls_ciphers_orig = constants.CIPHERS_OWASP_SUITE_B
|
|
|
|
|
listener = self.create_listener(
|
|
|
|
|
constants.PROTOCOL_TERMINATED_HTTPS, 80, self.lb_id,
|
|
|
|
|
default_tls_container_ref=cert_id,
|
|
|
|
|
tls_ciphers=tls_ciphers_orig)
|
|
|
|
|
self.set_lb_status(self.lb_id)
|
|
|
|
|
listener_path = self.LISTENER_PATH.format(
|
|
|
|
|
listener_id=listener['listener']['id'])
|
|
|
|
|
get_listener = self.get(listener_path).json['listener']
|
|
|
|
|
self.assertEqual(tls_ciphers_orig, get_listener.get('tls_ciphers'))
|
|
|
|
|
|
|
|
|
|
req_dict = {'tls_ciphers': ['cipher-insecure']}
|
|
|
|
|
res = self.put(listener_path, self._build_body(req_dict),
|
|
|
|
|
status=400)
|
|
|
|
|
fault = res.json['faultstring']
|
|
|
|
|
self.assertIn('Invalid input for field/attribute tls_ciphers', fault)
|
|
|
|
|
self.assert_correct_status(lb_id=self.lb_id)
|
|
|
|
|
|
|
|
|
|
def test_negative_update_udp_case(self):
|
|
|
|
|
api_listener = self.create_listener(constants.PROTOCOL_UDP, 6666,
|
|
|
|
|
self.lb_id).get(self.root_tag)
|
|
|
|
@@ -1611,6 +1749,12 @@ class TestListener(base.BaseAPITest):
|
|
|
|
|
self.conf.config(group='haproxy_amphora', timeout_member_connect=21)
|
|
|
|
|
self.conf.config(group='haproxy_amphora', timeout_member_data=22)
|
|
|
|
|
self.conf.config(group='haproxy_amphora', timeout_tcp_inspect=23)
|
|
|
|
|
self.conf.config(group='api_settings',
|
|
|
|
|
default_listener_tls_versions=(
|
|
|
|
|
constants.TLS_VERSIONS_OWASP_SUITE_B))
|
|
|
|
|
self.conf.config(group='api_settings',
|
|
|
|
|
default_listener_ciphers=(
|
|
|
|
|
constants.CIPHERS_OWASP_SUITE_B))
|
|
|
|
|
|
|
|
|
|
self.cert_manager_mock().get_secret.side_effect = [
|
|
|
|
|
sample_certs.X509_CA_CERT, sample_certs.X509_CA_CRL,
|
|
|
|
@@ -1635,7 +1779,9 @@ class TestListener(base.BaseAPITest):
|
|
|
|
|
timeout_member_data=3, timeout_tcp_inspect=4,
|
|
|
|
|
client_authentication=constants.CLIENT_AUTH_OPTIONAL,
|
|
|
|
|
client_crl_container_ref=crl_tls_uuid,
|
|
|
|
|
client_ca_tls_container_ref=ca_tls_uuid).get(self.root_tag)
|
|
|
|
|
client_ca_tls_container_ref=ca_tls_uuid,
|
|
|
|
|
tls_versions=[lib_consts.TLS_VERSION_1_3],
|
|
|
|
|
tls_ciphers='TLS_AES_256_GCM_SHA384').get(self.root_tag)
|
|
|
|
|
self.set_lb_status(self.lb_id)
|
|
|
|
|
unset_params = {
|
|
|
|
|
'name': None, 'description': None, 'connection_limit': None,
|
|
|
|
@@ -1644,7 +1790,8 @@ class TestListener(base.BaseAPITest):
|
|
|
|
|
'timeout_member_connect': None, 'timeout_member_data': None,
|
|
|
|
|
'timeout_tcp_inspect': None, 'client_ca_tls_container_ref': None,
|
|
|
|
|
'client_authentication': None, 'default_pool_id': None,
|
|
|
|
|
'client_crl_container_ref': None}
|
|
|
|
|
'client_crl_container_ref': None, 'tls_versions': None,
|
|
|
|
|
'tls_ciphers': None}
|
|
|
|
|
body = self._build_body(unset_params)
|
|
|
|
|
listener_path = self.LISTENER_PATH.format(
|
|
|
|
|
listener_id=listener['id'])
|
|
|
|
@@ -1666,6 +1813,10 @@ class TestListener(base.BaseAPITest):
|
|
|
|
|
self.assertEqual(constants.CLIENT_AUTH_NONE,
|
|
|
|
|
api_listener['client_authentication'])
|
|
|
|
|
self.assertIsNone(api_listener['default_pool_id'])
|
|
|
|
|
self.assertEqual(constants.TLS_VERSIONS_OWASP_SUITE_B,
|
|
|
|
|
api_listener['tls_versions'])
|
|
|
|
|
self.assertEqual(constants.CIPHERS_OWASP_SUITE_B,
|
|
|
|
|
api_listener['tls_ciphers'])
|
|
|
|
|
|
|
|
|
|
@mock.patch('octavia.common.tls_utils.cert_parser.load_certificates_data')
|
|
|
|
|
def test_update_with_bad_ca_cert(self, mock_cert_data):
|
|
|
|
|