Change IPAM DB API to use context instead of session object.

This code is not being used outside neutron tree so it's
safe to change the api to use context instead of session.
This change is required for implementing Ipam OVO and is
consistent with rest of the DB API code in neutron.

Change-Id: I600ce4384632bf013846e8aa1618acb126f7c598
Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
This commit is contained in:
Mohit Malik
2016-08-30 13:51:17 -07:00
committed by Ihar Hrachyshka
parent dc8ad65838
commit cc861ad30f
4 changed files with 69 additions and 71 deletions

View File

@@ -23,8 +23,8 @@ from neutron.ipam.drivers.neutrondb_ipam import db_models
class IpamSubnetManager(object): class IpamSubnetManager(object):
@classmethod @classmethod
def load_by_neutron_subnet_id(cls, session, neutron_subnet_id): def load_by_neutron_subnet_id(cls, context, neutron_subnet_id):
return session.query(db_models.IpamSubnet).filter_by( return context.session.query(db_models.IpamSubnet).filter_by(
neutron_subnet_id=neutron_subnet_id).first() neutron_subnet_id=neutron_subnet_id).first()
def __init__(self, ipam_subnet_id, neutron_subnet_id): def __init__(self, ipam_subnet_id, neutron_subnet_id):
@@ -35,13 +35,13 @@ class IpamSubnetManager(object):
def neutron_id(self): def neutron_id(self):
return self._neutron_subnet_id return self._neutron_subnet_id
def create(self, session): def create(self, context):
"""Create database models for an IPAM subnet. """Create database models for an IPAM subnet.
This method creates a subnet resource for the IPAM driver and This method creates a subnet resource for the IPAM driver and
associates it with its neutron identifier, if specified. associates it with its neutron identifier, if specified.
:param session: database sesssion. :param context: neutron api request context
:returns: the idenfier of created IPAM subnet :returns: the idenfier of created IPAM subnet
""" """
if not self._ipam_subnet_id: if not self._ipam_subnet_id:
@@ -49,23 +49,23 @@ class IpamSubnetManager(object):
ipam_subnet = db_models.IpamSubnet( ipam_subnet = db_models.IpamSubnet(
id=self._ipam_subnet_id, id=self._ipam_subnet_id,
neutron_subnet_id=self._neutron_subnet_id) neutron_subnet_id=self._neutron_subnet_id)
session.add(ipam_subnet) context.session.add(ipam_subnet)
return self._ipam_subnet_id return self._ipam_subnet_id
@classmethod @classmethod
def delete(cls, session, neutron_subnet_id): def delete(cls, context, neutron_subnet_id):
"""Delete IPAM subnet. """Delete IPAM subnet.
IPAM subnet no longer has foreign key to neutron subnet, IPAM subnet no longer has foreign key to neutron subnet,
so need to perform delete manually so need to perform delete manually
:param session: database sesssion :param context: neutron api request context
:param neutron_subnet_id: neutron subnet id associated with ipam subnet :param neutron_subnet_id: neutron subnet id associated with ipam subnet
""" """
return session.query(db_models.IpamSubnet).filter_by( return context.session.query(db_models.IpamSubnet).filter_by(
neutron_subnet_id=neutron_subnet_id).delete() neutron_subnet_id=neutron_subnet_id).delete()
def create_pool(self, session, pool_start, pool_end): def create_pool(self, context, pool_start, pool_end):
"""Create an allocation pool for the subnet. """Create an allocation pool for the subnet.
This method does not perform any validation on parameters; it simply This method does not perform any validation on parameters; it simply
@@ -79,50 +79,50 @@ class IpamSubnetManager(object):
ipam_subnet_id=self._ipam_subnet_id, ipam_subnet_id=self._ipam_subnet_id,
first_ip=pool_start, first_ip=pool_start,
last_ip=pool_end) last_ip=pool_end)
session.add(ip_pool) context.session.add(ip_pool)
return ip_pool return ip_pool
def delete_allocation_pools(self, session): def delete_allocation_pools(self, context):
"""Remove all allocation pools for the current subnet. """Remove all allocation pools for the current subnet.
:param session: database session :param context: neutron api request context
""" """
session.query(db_models.IpamAllocationPool).filter_by( context.session.query(db_models.IpamAllocationPool).filter_by(
ipam_subnet_id=self._ipam_subnet_id).delete() ipam_subnet_id=self._ipam_subnet_id).delete()
def list_pools(self, session): def list_pools(self, context):
"""Return pools for the current subnet.""" """Return pools for the current subnet."""
return session.query( return context.session.query(
db_models.IpamAllocationPool).filter_by( db_models.IpamAllocationPool).filter_by(
ipam_subnet_id=self._ipam_subnet_id) ipam_subnet_id=self._ipam_subnet_id)
def check_unique_allocation(self, session, ip_address): def check_unique_allocation(self, context, ip_address):
"""Validate that the IP address on the subnet is not in use.""" """Validate that the IP address on the subnet is not in use."""
iprequest = session.query(db_models.IpamAllocation).filter_by( iprequest = context.session.query(db_models.IpamAllocation).filter_by(
ipam_subnet_id=self._ipam_subnet_id, status='ALLOCATED', ipam_subnet_id=self._ipam_subnet_id, status='ALLOCATED',
ip_address=ip_address).first() ip_address=ip_address).first()
if iprequest: if iprequest:
return False return False
return True return True
def list_allocations(self, session, status='ALLOCATED'): def list_allocations(self, context, status='ALLOCATED'):
"""Return current allocations for the subnet. """Return current allocations for the subnet.
:param session: database session :param context: neutron api request context
:param status: IP allocation status :param status: IP allocation status
:returns: a list of IP allocation as instance of :returns: a list of IP allocation as instance of
neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAllocation neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAllocation
""" """
return session.query( return context.session.query(
db_models.IpamAllocation).filter_by( db_models.IpamAllocation).filter_by(
ipam_subnet_id=self._ipam_subnet_id, ipam_subnet_id=self._ipam_subnet_id,
status=status) status=status)
def create_allocation(self, session, ip_address, def create_allocation(self, context, ip_address,
status='ALLOCATED'): status='ALLOCATED'):
"""Create an IP allocation entry. """Create an IP allocation entry.
:param session: database session :param context: neutron api request context
:param ip_address: the IP address to allocate :param ip_address: the IP address to allocate
:param status: IP allocation status :param status: IP allocation status
""" """
@@ -130,16 +130,16 @@ class IpamSubnetManager(object):
ip_address=ip_address, ip_address=ip_address,
status=status, status=status,
ipam_subnet_id=self._ipam_subnet_id) ipam_subnet_id=self._ipam_subnet_id)
session.add(ip_request) context.session.add(ip_request)
def delete_allocation(self, session, ip_address): def delete_allocation(self, context, ip_address):
"""Remove an IP allocation for this subnet. """Remove an IP allocation for this subnet.
:param session: database session :param context: neutron api request context
:param ip_address: IP address for which the allocation entry should :param ip_address: IP address for which the allocation entry should
be removed. be removed.
""" """
return session.query(db_models.IpamAllocation).filter_by( return context.session.query(db_models.IpamAllocation).filter_by(
ip_address=ip_address, ip_address=ip_address,
ipam_subnet_id=self._ipam_subnet_id).delete( ipam_subnet_id=self._ipam_subnet_id).delete(
synchronize_session=False) synchronize_session=False)

View File

@@ -43,14 +43,14 @@ class NeutronDbSubnet(ipam_base.Subnet):
""" """
@classmethod @classmethod
def create_allocation_pools(cls, subnet_manager, session, pools, cidr): def create_allocation_pools(cls, subnet_manager, context, pools, cidr):
for pool in pools: for pool in pools:
# IPv6 addresses that start '::1', '::2', etc cause IP version # IPv6 addresses that start '::1', '::2', etc cause IP version
# ambiguity when converted to integers by pool.first and pool.last. # ambiguity when converted to integers by pool.first and pool.last.
# Infer the IP version from the subnet cidr. # Infer the IP version from the subnet cidr.
ip_version = cidr.version ip_version = cidr.version
subnet_manager.create_pool( subnet_manager.create_pool(
session, context,
netaddr.IPAddress(pool.first, ip_version).format(), netaddr.IPAddress(pool.first, ip_version).format(),
netaddr.IPAddress(pool.last, ip_version).format()) netaddr.IPAddress(pool.last, ip_version).format())
@@ -61,8 +61,7 @@ class NeutronDbSubnet(ipam_base.Subnet):
ipam_subnet_id, ipam_subnet_id,
subnet_request.subnet_id) subnet_request.subnet_id)
# Create subnet resource # Create subnet resource
session = ctx.session subnet_manager.create(ctx)
subnet_manager.create(session)
# If allocation pools are not specified, define them around # If allocation pools are not specified, define them around
# the subnet's gateway IP # the subnet's gateway IP
if not subnet_request.allocation_pools: if not subnet_request.allocation_pools:
@@ -71,7 +70,7 @@ class NeutronDbSubnet(ipam_base.Subnet):
else: else:
pools = subnet_request.allocation_pools pools = subnet_request.allocation_pools
# Create IPAM allocation pools # Create IPAM allocation pools
cls.create_allocation_pools(subnet_manager, session, pools, cls.create_allocation_pools(subnet_manager, ctx, pools,
subnet_request.subnet_cidr) subnet_request.subnet_cidr)
return cls(ipam_subnet_id, return cls(ipam_subnet_id,
@@ -89,7 +88,7 @@ class NeutronDbSubnet(ipam_base.Subnet):
:param neutron_subnet_id: neutron subnet identifier. :param neutron_subnet_id: neutron subnet identifier.
""" """
ipam_subnet = ipam_db_api.IpamSubnetManager.load_by_neutron_subnet_id( ipam_subnet = ipam_db_api.IpamSubnetManager.load_by_neutron_subnet_id(
ctx.session, neutron_subnet_id) ctx, neutron_subnet_id)
if not ipam_subnet: if not ipam_subnet:
LOG.error(_LE("IPAM subnet referenced to " LOG.error(_LE("IPAM subnet referenced to "
"Neutron subnet %s does not exist"), "Neutron subnet %s does not exist"),
@@ -132,15 +131,15 @@ class NeutronDbSubnet(ipam_base.Subnet):
self._subnet_id) self._subnet_id)
self._context = ctx self._context = ctx
def _verify_ip(self, session, ip_address): def _verify_ip(self, context, ip_address):
"""Verify whether IP address can be allocated on subnet. """Verify whether IP address can be allocated on subnet.
:param session: database session :param context: neutron api request context
:param ip_address: String representing the IP address to verify :param ip_address: String representing the IP address to verify
:raises: InvalidInput, IpAddressAlreadyAllocated :raises: InvalidInput, IpAddressAlreadyAllocated
""" """
# Ensure that the IP's are unique # Ensure that the IP's are unique
if not self.subnet_manager.check_unique_allocation(session, if not self.subnet_manager.check_unique_allocation(context,
ip_address): ip_address):
raise ipam_exc.IpAddressAlreadyAllocated( raise ipam_exc.IpAddressAlreadyAllocated(
subnet_id=self.subnet_manager.neutron_id, subnet_id=self.subnet_manager.neutron_id,
@@ -152,13 +151,13 @@ class NeutronDbSubnet(ipam_base.Subnet):
subnet_id=self.subnet_manager.neutron_id, subnet_id=self.subnet_manager.neutron_id,
ip=ip_address) ip=ip_address)
def _generate_ip(self, session, prefer_next=False): def _generate_ip(self, context, prefer_next=False):
"""Generate an IP address from the set of available addresses.""" """Generate an IP address from the set of available addresses."""
ip_allocations = netaddr.IPSet() ip_allocations = netaddr.IPSet()
for ipallocation in self.subnet_manager.list_allocations(session): for ipallocation in self.subnet_manager.list_allocations(context):
ip_allocations.add(netaddr.IPAddress(ipallocation.ip_address)) ip_allocations.add(netaddr.IPAddress(ipallocation.ip_address))
for ip_pool in self.subnet_manager.list_pools(session): for ip_pool in self.subnet_manager.list_pools(context):
ip_set = netaddr.IPSet() ip_set = netaddr.IPSet()
ip_set.add(netaddr.IPRange(ip_pool.first_ip, ip_pool.last_ip)) ip_set.add(netaddr.IPRange(ip_pool.first_ip, ip_pool.last_ip))
av_set = ip_set.difference(ip_allocations) av_set = ip_set.difference(ip_allocations)
@@ -183,7 +182,6 @@ class NeutronDbSubnet(ipam_base.Subnet):
# running transaction, which is started on create_port or upper level. # running transaction, which is started on create_port or upper level.
# To be able to do rollback/retry actions correctly ipam driver # To be able to do rollback/retry actions correctly ipam driver
# should not create new nested transaction blocks. # should not create new nested transaction blocks.
session = self._context.session
all_pool_id = None all_pool_id = None
# NOTE(salv-orlando): It would probably better to have a simpler # NOTE(salv-orlando): It would probably better to have a simpler
# model for address requests and just check whether there is a # model for address requests and just check whether there is a
@@ -192,22 +190,24 @@ class NeutronDbSubnet(ipam_base.Subnet):
# This handles both specific and automatic address requests # This handles both specific and automatic address requests
# Check availability of requested IP # Check availability of requested IP
ip_address = str(address_request.address) ip_address = str(address_request.address)
self._verify_ip(session, ip_address) self._verify_ip(self._context, ip_address)
else: else:
prefer_next = isinstance(address_request, prefer_next = isinstance(address_request,
ipam_req.PreferNextAddressRequest) ipam_req.PreferNextAddressRequest)
ip_address, all_pool_id = self._generate_ip(session, prefer_next) ip_address, all_pool_id = self._generate_ip(self._context,
prefer_next)
# Create IP allocation request object # Create IP allocation request object
# The only defined status at this stage is 'ALLOCATED'. # The only defined status at this stage is 'ALLOCATED'.
# More states will be available in the future - e.g.: RECYCLABLE # More states will be available in the future - e.g.: RECYCLABLE
try: try:
with session.begin(subtransactions=True): with self._context.session.begin(subtransactions=True):
# NOTE(kevinbenton): we use a subtransaction to force # NOTE(kevinbenton): we use a subtransaction to force
# a flush here so we can capture DBReferenceErrors due # a flush here so we can capture DBReferenceErrors due
# to concurrent subnet deletions. (galera would deadlock # to concurrent subnet deletions. (galera would deadlock
# later on final commit) # later on final commit)
self.subnet_manager.create_allocation(session, ip_address) self.subnet_manager.create_allocation(self._context,
ip_address)
except db_exc.DBReferenceError: except db_exc.DBReferenceError:
raise n_exc.SubnetNotFound( raise n_exc.SubnetNotFound(
subnet_id=self.subnet_manager.neutron_id) subnet_id=self.subnet_manager.neutron_id)
@@ -215,21 +215,19 @@ class NeutronDbSubnet(ipam_base.Subnet):
def deallocate(self, address): def deallocate(self, address):
# This is almost a no-op because the Neutron DB IPAM driver does not # This is almost a no-op because the Neutron DB IPAM driver does not
# delete IPAllocation objects at every deallocation. The only operation # delete IPAllocation objects at every deallocation. The only
# it performs is to delete an IPRequest entry. # operation it performs is to delete an IPRequest entry.
session = self._context.session
count = self.subnet_manager.delete_allocation( count = self.subnet_manager.delete_allocation(
session, address) self._context, address)
# count can hardly be greater than 1, but it can be 0... # count can hardly be greater than 1, but it can be 0...
if not count: if not count:
raise ipam_exc.IpAddressAllocationNotFound( raise ipam_exc.IpAddressAllocationNotFound(
subnet_id=self.subnet_manager.neutron_id, subnet_id=self.subnet_manager.neutron_id,
ip_address=address) ip_address=address)
def _no_pool_changes(self, session, pools): def _no_pool_changes(self, context, pools):
"""Check if pool updates in db are required.""" """Check if pool updates in db are required."""
db_pools = self.subnet_manager.list_pools(session) db_pools = self.subnet_manager.list_pools(context)
iprange_pools = [netaddr.IPRange(pool.first_ip, pool.last_ip) iprange_pools = [netaddr.IPRange(pool.first_ip, pool.last_ip)
for pool in db_pools] for pool in db_pools]
return pools == iprange_pools return pools == iprange_pools
@@ -238,11 +236,11 @@ class NeutronDbSubnet(ipam_base.Subnet):
# Pools have already been validated in the subnet request object which # Pools have already been validated in the subnet request object which
# was sent to the subnet pool driver. Further validation should not be # was sent to the subnet pool driver. Further validation should not be
# required. # required.
session = self._context.session if self._no_pool_changes(self._context, pools):
if self._no_pool_changes(session, pools):
return return
self.subnet_manager.delete_allocation_pools(session) self.subnet_manager.delete_allocation_pools(self._context)
self.create_allocation_pools(self.subnet_manager, session, pools, cidr) self.create_allocation_pools(self.subnet_manager, self._context, pools,
cidr)
self._pools = pools self._pools = pools
def get_details(self): def get_details(self):
@@ -313,7 +311,7 @@ class NeutronDbPool(subnet_alloc.SubnetAllocator):
IPAM-related data has no foreign key relationships to neutron subnet, IPAM-related data has no foreign key relationships to neutron subnet,
so removing ipam subnet manually so removing ipam subnet manually
""" """
count = ipam_db_api.IpamSubnetManager.delete(self._context.session, count = ipam_db_api.IpamSubnetManager.delete(self._context,
subnet_id) subnet_id)
if count < 1: if count < 1:
LOG.error(_LE("IPAM subnet referenced to " LOG.error(_LE("IPAM subnet referenced to "

View File

@@ -34,7 +34,7 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase):
self.multi_pool = (('1.2.3.2', '1.2.3.12'), ('1.2.3.15', '1.2.3.24')) self.multi_pool = (('1.2.3.2', '1.2.3.12'), ('1.2.3.15', '1.2.3.24'))
self.subnet_manager = db_api.IpamSubnetManager(self.ipam_subnet_id, self.subnet_manager = db_api.IpamSubnetManager(self.ipam_subnet_id,
self.neutron_subnet_id) self.neutron_subnet_id)
self.subnet_manager_id = self.subnet_manager.create(self.ctx.session) self.subnet_manager_id = self.subnet_manager.create(self.ctx)
self.ctx.session.flush() self.ctx.session.flush()
def test_create(self): def test_create(self):
@@ -44,7 +44,7 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase):
self.assertEqual(1, len(subnets)) self.assertEqual(1, len(subnets))
def test_remove(self): def test_remove(self):
count = db_api.IpamSubnetManager.delete(self.ctx.session, count = db_api.IpamSubnetManager.delete(self.ctx,
self.neutron_subnet_id) self.neutron_subnet_id)
self.assertEqual(1, count) self.assertEqual(1, count)
subnets = self.ctx.session.query(db_models.IpamSubnet).filter_by( subnets = self.ctx.session.query(db_models.IpamSubnet).filter_by(
@@ -52,7 +52,7 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase):
self.assertEqual(0, len(subnets)) self.assertEqual(0, len(subnets))
def test_remove_non_existent_subnet(self): def test_remove_non_existent_subnet(self):
count = db_api.IpamSubnetManager.delete(self.ctx.session, count = db_api.IpamSubnetManager.delete(self.ctx,
'non-existent') 'non-existent')
self.assertEqual(0, count) self.assertEqual(0, count)
@@ -61,7 +61,7 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase):
any(pool == (db_pool.first_ip, db_pool.last_ip) for pool in pools)) any(pool == (db_pool.first_ip, db_pool.last_ip) for pool in pools))
def test_create_pool(self): def test_create_pool(self):
self.subnet_manager.create_pool(self.ctx.session, self.subnet_manager.create_pool(self.ctx,
self.single_pool[0], self.single_pool[0],
self.single_pool[1]) self.single_pool[1])
@@ -71,25 +71,25 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase):
def test_check_unique_allocation(self): def test_check_unique_allocation(self):
self.assertTrue(self.subnet_manager.check_unique_allocation( self.assertTrue(self.subnet_manager.check_unique_allocation(
self.ctx.session, self.subnet_ip)) self.ctx, self.subnet_ip))
def test_check_unique_allocation_negative(self): def test_check_unique_allocation_negative(self):
self.subnet_manager.create_allocation(self.ctx.session, self.subnet_manager.create_allocation(self.ctx,
self.subnet_ip) self.subnet_ip)
self.assertFalse(self.subnet_manager.check_unique_allocation( self.assertFalse(self.subnet_manager.check_unique_allocation(
self.ctx.session, self.subnet_ip)) self.ctx, self.subnet_ip))
def test_list_allocations(self): def test_list_allocations(self):
ips = ['1.2.3.4', '1.2.3.6', '1.2.3.7'] ips = ['1.2.3.4', '1.2.3.6', '1.2.3.7']
for ip in ips: for ip in ips:
self.subnet_manager.create_allocation(self.ctx.session, ip) self.subnet_manager.create_allocation(self.ctx, ip)
allocs = self.subnet_manager.list_allocations(self.ctx.session).all() allocs = self.subnet_manager.list_allocations(self.ctx).all()
self.assertEqual(len(ips), len(allocs)) self.assertEqual(len(ips), len(allocs))
for allocation in allocs: for allocation in allocs:
self.assertIn(allocation.ip_address, ips) self.assertIn(allocation.ip_address, ips)
def _test_create_allocation(self): def _test_create_allocation(self):
self.subnet_manager.create_allocation(self.ctx.session, self.subnet_manager.create_allocation(self.ctx,
self.subnet_ip) self.subnet_ip)
alloc = self.ctx.session.query(db_models.IpamAllocation).filter_by( alloc = self.ctx.session.query(db_models.IpamAllocation).filter_by(
ipam_subnet_id=self.ipam_subnet_id).all() ipam_subnet_id=self.ipam_subnet_id).all()
@@ -102,7 +102,7 @@ class TestIpamSubnetManager(testlib_api.SqlTestCase):
def test_delete_allocation(self): def test_delete_allocation(self):
allocs = self._test_create_allocation() allocs = self._test_create_allocation()
self.subnet_manager.delete_allocation(self.ctx.session, self.subnet_manager.delete_allocation(self.ctx,
allocs[0].ip_address) allocs[0].ip_address)
allocs = self.ctx.session.query(db_models.IpamAllocation).filter_by( allocs = self.ctx.session.query(db_models.IpamAllocation).filter_by(

View File

@@ -275,14 +275,14 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
def test__verify_ip_succeeds(self): def test__verify_ip_succeeds(self):
cidr = '10.0.0.0/24' cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0] ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
ipam_subnet._verify_ip(self.ctx.session, '10.0.0.2') ipam_subnet._verify_ip(self.ctx, '10.0.0.2')
def test__verify_ip_not_in_subnet_fails(self): def test__verify_ip_not_in_subnet_fails(self):
cidr = '10.0.0.0/24' cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0] ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
self.assertRaises(ipam_exc.InvalidIpForSubnet, self.assertRaises(ipam_exc.InvalidIpForSubnet,
ipam_subnet._verify_ip, ipam_subnet._verify_ip,
self.ctx.session, self.ctx,
'192.168.0.2') '192.168.0.2')
def test__verify_ip_bcast_and_network_fail(self): def test__verify_ip_bcast_and_network_fail(self):
@@ -290,11 +290,11 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0] ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
self.assertRaises(ipam_exc.InvalidIpForSubnet, self.assertRaises(ipam_exc.InvalidIpForSubnet,
ipam_subnet._verify_ip, ipam_subnet._verify_ip,
self.ctx.session, self.ctx,
'10.0.0.255') '10.0.0.255')
self.assertRaises(ipam_exc.InvalidIpForSubnet, self.assertRaises(ipam_exc.InvalidIpForSubnet,
ipam_subnet._verify_ip, ipam_subnet._verify_ip,
self.ctx.session, self.ctx,
'10.0.0.0') '10.0.0.0')
def _allocate_address(self, cidr, ip_version, address_request): def _allocate_address(self, cidr, ip_version, address_request):
@@ -415,7 +415,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
last_ip='192.168.10.60')] last_ip='192.168.10.60')]
ipam_subnet.subnet_manager.list_pools = mock.Mock(return_value=pools) ipam_subnet.subnet_manager.list_pools = mock.Mock(return_value=pools)
return ipam_subnet._no_pool_changes(self.ctx.session, new_pools) return ipam_subnet._no_pool_changes(self.ctx, new_pools)
def test__no_pool_changes_negative(self): def test__no_pool_changes_negative(self):
pool_list = [[netaddr.IPRange('192.168.10.2', '192.168.10.254')], pool_list = [[netaddr.IPRange('192.168.10.2', '192.168.10.254')],