Fix hacking checks from neutron-lib 3.21.1

Only N535 (``check_no_eventlet_imports``) is disabled temporarily.

Related-Bug: #2121609
Signed-off-by: Rodolfo Alonso Hernandez <ralonsoh@redhat.com>
Change-Id: I5df2157635d3800f874d6027e257f796d873a0c1
This commit is contained in:
Rodolfo Alonso Hernandez
2025-09-09 06:31:15 +00:00
parent 50e00d232b
commit ccf4c29b14
12 changed files with 39 additions and 32 deletions

View File

@@ -16,6 +16,7 @@ import collections
import ipaddress
import threading
from neutron_lib._i18n import _
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@@ -53,8 +54,8 @@ LOCAL_CLUSTER_OVN_TABLES = ['Logical_Switch', 'Logical_Switch_Port',
def _validate_ovn_version(distributed, idl):
if not distributed and 'gateway_port' not in idl.tables['NAT'].columns:
raise RuntimeError(
"Centralized routing requires gateway_port column in the "
"OVN_Northbound schema. Please update OVN to 23.09.0 or later.")
_("Centralized routing requires gateway_port column in the "
"OVN_Northbound schema. Please update OVN to 23.09.0 or later."))
class NATExposer:
@@ -63,11 +64,11 @@ class NATExposer:
def expose_fip_from_nat(self, nat):
raise RuntimeError(
"The exposer does not have distributed flag set yet")
_("The exposer does not have distributed flag set yet"))
def withdraw_fip_from_nat(self, nat):
raise RuntimeError(
"The exposer does not have distributed flag set yet")
_("The exposer does not have distributed flag set yet"))
@property
def distributed(self):
@@ -83,7 +84,7 @@ class NATExposer:
self.withdraw_fip_from_nat = self._withdraw_nat_centralized
def _expose_nat_distributed(self, nat):
raise NotImplementedError("Distributed NAT is not implemented yet.")
raise NotImplementedError(_("Distributed NAT is not implemented yet."))
def _expose_nat_centralized(self, nat):
net_id = nat.external_ids[constants.OVN_FIP_NET_EXT_ID_KEY]
@@ -104,7 +105,7 @@ class NATExposer:
self.agent.expose_fip(nat.external_ip, mac, ls_name, lsp)
def _withdraw_nat_distributed(self, nat):
raise NotImplementedError("Distributed NAT is not implemented yet.")
raise NotImplementedError(_("Distributed NAT is not implemented yet."))
def _withdraw_nat_centralized(self, nat):
lsp = self.agent.nb_idl.lsp_get(nat.logical_port[0]).execute()

View File

@@ -15,6 +15,7 @@
import collections
import netaddr
from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
@@ -513,10 +514,10 @@ def _offset_for_vni_and_vlan(vni: int, vlan: str):
def setup(ovs_bridge, vni, evpn_opts, mode=constants.OVN_EVPN_TYPE_L3,
ovs_flows={}) -> EvpnBridge:
ovs_flows=None) -> EvpnBridge:
# This method will either create the EvpnBridge or return the one that
# already exists for the current vni.
ovs_flows = ovs_flows or {}
vni = int(vni) # make sure the vni is a int, for lookup purposes
if local_bridges.get(vni, None) is None:
@@ -537,8 +538,8 @@ def lookup(ovs_bridge: str, vlan: str) -> EvpnBridge:
if str(vlan) in br.vlans:
return br
raise KeyError('Could not locate EVPN for bridge %s and/or vlan %s' % (
ovs_bridge, vlan))
raise KeyError(_('Could not locate EVPN for bridge %s and/or vlan %s' %
(ovs_bridge, vlan)))
def lookup_vlan(ovs_bridge: str, vlan: str) -> VlanDev:

View File

@@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import tempfile
from jinja2 import Template
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from ovn_bgp_agent import constants
import ovn_bgp_agent.privileged.vtysh
@@ -116,7 +116,7 @@ router bgp {{ bgp_as }} vrf {{ vrf_name }}
def _get_router_id():
output = ovn_bgp_agent.privileged.vtysh.run_vtysh_command(
command='show ip bgp summary json')
return json.loads(output).get('ipv4Unicast', {}).get('routerId')
return jsonutils.loads(output).get('ipv4Unicast', {}).get('routerId')
def _run_vtysh_config_with_tempfile(vrf_config):

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
from ovs.stream import Stream
@@ -191,8 +192,8 @@ class StaticMACBindingAddCommand(command.AddCommand):
# With the current database schema, this cannot happen, but
# better safe than sorry.
raise RuntimeError(
"Unexpected duplicates in database for port %s "
"and ip %s" % (self.port, self.ip))
_("Unexpected duplicates in database for port %s "
"and ip %s" % (self.port, self.ip)))
binding = static_mac_binding_result[0]
if self.may_exist:
# When no changes are made to a record, the parent
@@ -204,8 +205,8 @@ class StaticMACBindingAddCommand(command.AddCommand):
return
else:
raise RuntimeError(
"Static MAC Binding entry for port %s and ip %s exists" % (
self.port, self.ip))
_("Static MAC Binding entry for port %s and ip %s exists" %
(self.port, self.ip)))
binding = txn.insert(self.api.tables[self.table_name])
binding.logical_port = self.port
binding.ip = self.ip
@@ -239,8 +240,8 @@ class StaticMACBindingDelCommand(command.BaseCommand):
# With the current database schema, this cannot happen, but
# better safe than sorry.
raise RuntimeError(
"Unexpected duplicates in database for port %s "
"and ip %s" % (self.port, self.ip))
_("Unexpected duplicates in database for port %s "
"and ip %s" % (self.port, self.ip)))
binding = static_mac_binding_result[0]
binding.delete()
return
@@ -248,8 +249,8 @@ class StaticMACBindingDelCommand(command.BaseCommand):
return
else:
raise RuntimeError(
"Static MAC Binding entry for port %s and ip %s does not "
"exist" % (self.port, self.ip))
_("Static MAC Binding entry for port %s and ip %s does not "
"exist" % (self.port, self.ip)))
class GetLSPsForGwChassisCommand(command.ReadOnlyCommand):

View File

@@ -31,7 +31,8 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def ensure_base_wiring_config(idl, ovs_idl, ovn_idl=None, routing_tables={}):
def ensure_base_wiring_config(idl, ovs_idl, ovn_idl=None, routing_tables=None):
routing_tables = routing_tables or {}
if CONF.exposing_method == constants.EXPOSE_METHOD_UNDERLAY:
return _ensure_base_wiring_config_underlay(idl, ovs_idl,
routing_tables)

View File

@@ -49,6 +49,8 @@ def _get_test_log_path():
DEFAULT_LOG_DIR = os.path.join(_get_test_log_path(), 'functional-logs')
# NOTE(ralonsoh): this timeout catch method needs to be reimplemented without
# using eventlet.
class _CatchTimeoutMetaclass(abc.ABCMeta):
def __init__(cls, name, bases, dct):
super(_CatchTimeoutMetaclass, cls).__init__(name, bases, dct)

View File

@@ -460,8 +460,8 @@ class TestNBOVNBGPDriver(test_base.TestCase):
bridge_device, bridge_vlan = (
self.nb_bgp_driver._get_bridge_for_localnet_port(localnet))
self.assertEqual(bridge_device, None)
self.assertEqual(bridge_vlan, None)
self.assertIsNone(bridge_device)
self.assertIsNone(bridge_vlan)
def test_is_ip_exposed(self):
self.nb_bgp_driver._exposed_ips['fake-switch'] = {'fake-ip': {}}

View File

@@ -103,8 +103,7 @@ class TestDriverUtils(test_base.TestCase):
def test_get_port_chassis_no_information(self):
row = utils.create_row()
self.assertEqual(driver_utils.get_port_chassis(row, chassis='foo'),
None)
self.assertIsNone(driver_utils.get_port_chassis(row, chassis='foo'))
def test_check_name_prefix(self):
lb = utils.create_row(name='some-name')

View File

@@ -280,7 +280,8 @@ class TestEVPN(test_base.TestCase):
self.assertTrue(vlan_dev._veth_created)
self.assertTrue(vlan_dev._setup_done)
def test_evpnbridge_vlan_setup_l3(self, custom_ips=[]):
def test_evpnbridge_vlan_setup_l3(self, custom_ips=None):
custom_ips = custom_ips or []
vlan_tag = 4094
vlan_tag_str = '4094'
_, evpn_bridge, vlan_dev = self._create_bridge_and_vlan(vlan_tag)

View File

@@ -33,15 +33,15 @@ class TestHelpers(test_base.TestCase):
bridge_mappings = ""
ret_net, ret_bridge = helpers.parse_bridge_mapping(bridge_mappings)
self.assertEqual(ret_net, None)
self.assertEqual(ret_bridge, None)
self.assertIsNone(ret_net)
self.assertIsNone(ret_bridge)
def test_parse_bridge_mappings_wrong_format(self):
bridge_mappings = "provider-1:br-ex:extra_field"
ret_net, ret_bridge = helpers.parse_bridge_mapping(bridge_mappings)
self.assertEqual(ret_net, None)
self.assertEqual(ret_bridge, None)
self.assertIsNone(ret_net)
self.assertIsNone(ret_bridge)
class TestHelperGetLBDatapaths(test_base.TestCase):

View File

@@ -46,7 +46,8 @@ def get_ip_version(ip):
wait=tenacity.wait_exponential(multiplier=0.02, max=1),
stop=tenacity.stop_after_delay(8),
reraise=True)
def get_interfaces(filter_out=[]):
def get_interfaces(filter_out=None):
filter_out = filter_out or []
with pyroute2.IPRoute() as ipr:
return [iface.get_attr('IFLA_IFNAME') for iface in ipr.get_links()
if iface.get_attr('IFLA_IFNAME') not in filter_out]

View File

@@ -67,6 +67,6 @@ commands =
# E123, E125 skipped as they are invalid PEP-8.
show-source = True
ignore = E123,E125,W504
ignore = E123,E125,W504,N535
builtins = _
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build