Implement nova network API for quantum API 2.0

blueprint improved-nova-quantum-integration

This new network API depends on quantum v2.0 client to connect quantum server
via Quantum server API V2.0. This change implements the minimum set of methods
so that we can boot an instance with ip allocated by quantum server.

Change-Id: I34acbbf84dd8601db8143267c8cc41255a0c6d3f
This commit is contained in:
Yong Sheng Gong
2012-06-25 14:52:58 +08:00
committed by Monty Taylor
parent d335457f48
commit 66bf71b1fc
6 changed files with 812 additions and 2 deletions

View File

@@ -224,6 +224,7 @@ William Wolf <throughnothing@gmail.com>
Yaguang Tang <heut2008@gmail.com>
Ying Chun Guo <daisy.ycguo@gmail.com>
Yoshiaki Tamura <yoshi@midokura.jp>
Yong Sheng Gong <gongysh@cn.ibm.com>
Youcef Laribi <Youcef.Laribi@eu.citrix.com>
Yun Mao <yunmao@gmail.com>
Yun Shen <Yun.Shen@hp.com>

View File

@@ -0,0 +1,58 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
from nova import exception
from nova import flags
from nova.openstack.common import log as logging
from nova.openstack.common import excutils
from quantumclient import client
from quantumclient.v2_0 import client as clientv20
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def _get_auth_token():
try:
httpclient = client.HTTPClient(
username=FLAGS.quantum_admin_username,
tenant_name=FLAGS.quantum_admin_tenant_name,
password=FLAGS.quantum_admin_password,
auth_url=FLAGS.quantum_admin_auth_url,
timeout=FLAGS.quantum_url_timeout,
auth_strategy=FLAGS.quantum_auth_strategy)
httpclient.authenticate()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_("_get_auth_token() failed"))
return httpclient.auth_token
def get_client(context):
token = context.auth_token
if not token:
if FLAGS.quantum_auth_strategy:
token = _get_auth_token()
if token:
my_client = clientv20.Client(
endpoint_url=FLAGS.quantum_url,
token=token, timeout=FLAGS.quantum_url_timeout)
else:
my_client = clientv20.Client(
endpoint_url=FLAGS.quantum_url,
auth_strategy=None, timeout=FLAGS.quantum_url_timeout)
return my_client

View File

@@ -0,0 +1,344 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
from nova.db import base
from nova import exception
from nova import flags
from nova.openstack.common import log as logging
from nova.network.api import refresh_cache
from nova.network import model as network_model
from nova.network import quantumv2
from nova.openstack.common import cfg
from nova.openstack.common import excutils
quantum_opts = [
cfg.StrOpt('quantum_url',
default='http://127.0.0.1:9696',
help='URL for connecting to quantum'),
cfg.IntOpt('quantum_url_timeout',
default=30,
help='timeout value for connecting to quantum in seconds'),
cfg.StrOpt('quantum_admin_username',
help='username for connecting to quantum in admin context'),
cfg.StrOpt('quantum_admin_password',
help='password for connecting to quantum in admin context'),
cfg.StrOpt('quantum_admin_tenant_name',
help='tenant name for connecting to quantum in admin context'),
cfg.StrOpt('quantum_admin_auth_url',
default='http://localhost:5000/v2.0',
help='auth url for connecting to quantum in admin context'),
cfg.StrOpt('quantum_auth_strategy',
default='keystone',
help='auth strategy for connecting to '
'quantum in admin context'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(quantum_opts)
LOG = logging.getLogger(__name__)
class API(base.Base):
"""API for interacting with the quantum 2.x API."""
def setup_networks_on_host(self, context, instance, host=None,
teardown=False):
"""Setup or teardown the network structures."""
def allocate_for_instance(self, context, instance, **kwargs):
"""Allocate all network resources for the instance."""
LOG.debug(_('allocate_for_instance() for %s'),
instance['display_name'])
search_opts = {}
if instance['project_id']:
search_opts.update({"tenant_id": instance['project_id']})
else:
msg = _('empty project id for instance %s')
raise exception.InvalidInput(
reason=msg % instance['display_name'])
data = quantumv2.get_client(context).list_networks(**search_opts)
nets = data.get('networks', [])
created_port_ids = []
for network in nets:
port_req_body = {'port': {'network_id': network['id'],
'admin_state_up': True,
'device_id': instance['uuid'],
'tenant_id': instance['project_id']},
}
try:
created_port_ids.append(
quantumv2.get_client(context).create_port(
port_req_body)['port']['id'])
except Exception:
with excutils.save_and_reraise_exception():
for port_id in created_port_ids:
try:
quantumv2.get_client(context).delete_port(port_id)
except Exception as ex:
msg = _("Fail to delete port %(portid)s with"
" failure: %(exception)s")
LOG.debug(msg, {'portid': port_id,
'exception': ex})
return self.get_instance_nw_info(context, instance, networks=nets)
def deallocate_for_instance(self, context, instance, **kwargs):
"""Deallocate all network resources related to the instance."""
LOG.debug(_('deallocate_for_instance() for %s'),
instance['display_name'])
search_opts = {'device_id': instance['uuid']}
data = quantumv2.get_client(context).list_ports(**search_opts)
ports = data.get('ports', [])
for port in ports:
try:
quantumv2.get_client(context).delete_port(port['id'])
except Exception as ex:
with excutils.save_and_reraise_exception():
msg = _("Fail to delete port %(portid)s with failure:"
"%(exception)s")
LOG.debug(msg, {'portid': port['id'],
'exception': ex})
@refresh_cache
def get_instance_nw_info(self, context, instance, networks=None):
LOG.debug(_('get_instance_nw_info() for %s'),
instance['display_name'])
nw_info = self._build_network_info_model(context, instance, networks)
return network_model.NetworkInfo.hydrate(nw_info)
def add_fixed_ip_to_instance(self, context, instance, network_id):
"""Add a fixed ip to the instance from specified network."""
raise NotImplemented()
def remove_fixed_ip_from_instance(self, context, instance, address):
"""Remove a fixed ip from the instance."""
raise NotImplemented()
def validate_networks(self, context, requested_networks):
"""Validate that the tenant has the requested networks."""
LOG.debug(_('validate_networks() for %s'),
requested_networks)
if not requested_networks:
return
search_opts = {"tenant_id": context.project_id}
net_ids = [net_id for (net_id, _i) in requested_networks]
search_opts['id'] = net_ids
data = quantumv2.get_client(context).list_networks(**search_opts)
nets = data.get('networks', [])
if len(nets) != len(net_ids):
requsted_netid_set = set(net_ids)
returned_netid_set = set([net['id'] for net in nets])
lostid_set = requsted_netid_set - returned_netid_set
id_str = ''
for _id in lostid_set:
id_str = id_str and id_str + ', ' + _id or _id
raise exception.NetworkNotFound(network_id=id_str)
def get_instance_uuids_by_ip_filter(self, context, filters):
"""Return a list of dicts in the form of
[{'instance_uuid': uuid}] that matched the ip filter.
"""
# filters['ip'] is composed as '^%s$' % fixed_ip.replace('.', '\\.')
ip = filters.get('ip')
# we remove ^$\ in the ip filer
if ip[0] == '^':
ip = ip[1:]
if ip[-1] == '$':
ip = ip[:-1]
ip = ip.replace('\\.', '.')
search_opts = {"fixed_ips": {'ip_address': ip}}
data = quantumv2.get_client(context).list_ports(**search_opts)
ports = data.get('ports', [])
return [{'instance_uuid': port['device_id']} for port in ports
if port['device_id']]
@refresh_cache
def associate_floating_ip(self, context, instance,
floating_address, fixed_address,
affect_auto_assigned=False):
"""Associate a floating ip with a fixed ip."""
raise NotImplemented()
def get_all(self, context):
raise NotImplemented()
def get(self, context, network_uuid):
raise NotImplemented()
def delete(self, context, network_uuid):
raise NotImplemented()
def disassociate(self, context, network_uuid):
raise NotImplemented()
def get_fixed_ip(self, context, id):
raise NotImplemented()
def get_fixed_ip_by_address(self, context, address):
raise NotImplemented()
def get_floating_ip(self, context, id):
raise NotImplemented()
def get_floating_ip_pools(self, context):
raise NotImplemented()
def get_floating_ip_by_address(self, context, address):
raise NotImplemented()
def get_floating_ips_by_project(self, context):
raise NotImplemented()
def get_floating_ips_by_fixed_address(self, context, fixed_address):
raise NotImplemented()
def get_instance_id_by_floating_address(self, context, address):
raise NotImplemented()
def get_vifs_by_instance(self, context, instance):
raise NotImplemented()
def get_vif_by_mac_address(self, context, mac_address):
raise NotImplemented()
def allocate_floating_ip(self, context, pool=None):
"""Add a floating ip to a project from a pool."""
raise NotImplemented()
def release_floating_ip(self, context, address,
affect_auto_assigned=False):
"""Remove a floating ip with the given address from a project."""
raise NotImplemented()
@refresh_cache
def disassociate_floating_ip(self, context, instance, address,
affect_auto_assigned=False):
"""Disassociate a floating ip from the fixed ip
it is associated with."""
raise NotImplemented()
def add_network_to_project(self, context, project_id):
"""Force add a network to the project."""
raise NotImplemented()
def _build_network_info_model(self, context, instance, networks=None):
search_opts = {'tenant_id': instance['project_id'],
'device_id': instance['uuid'], }
data = quantumv2.get_client(context).list_ports(**search_opts)
ports = data.get('ports', [])
if not networks:
search_opts = {}
if instance['project_id']:
search_opts.update({"tenant_id": instance['project_id']})
data = quantumv2.get_client(context).list_networks(**search_opts)
networks = data.get('networks', [])
nw_info = network_model.NetworkInfo()
for port in ports:
network_name = None
for net in networks:
if port['network_id'] == net['id']:
network_name = net['name']
break
subnets = self._get_subnets_from_port(context, port)
network_IPs = [network_model.FixedIP(address=ip_address)
for ip_address in [ip['ip_address']
for ip in port['fixed_ips']]]
# TODO(gongysh) get floating_ips for each fixed_ip
for subnet in subnets:
subnet['ips'] = [fixed_ip for fixed_ip in network_IPs
if fixed_ip.is_in_subnet(subnet)]
network = network_model.Network(
id=port['network_id'],
bridge='', # Quantum ignores this field
injected=FLAGS.flat_injected,
label=network_name,
tenant_id=net['tenant_id']
)
network['subnets'] = subnets
nw_info.append(network_model.VIF(
id=port['id'],
address=port['mac_address'],
network=network))
return nw_info
def _get_subnets_from_port(self, context, port):
"""Return the subnets for a given port."""
fixed_ips = port['fixed_ips']
search_opts = {'id': [ip['subnet_id'] for ip in fixed_ips]}
data = quantumv2.get_client(context).list_subnets(**search_opts)
ipam_subnets = data.get('subnets', [])
subnets = []
for subnet in ipam_subnets:
subnet_dict = {'cidr': subnet['cidr'],
'gateway': network_model.IP(
address=subnet['gateway_ip'],
type='gateway'),
}
# TODO(gongysh) deal with dhcp
subnet_object = network_model.Subnet(**subnet_dict)
for dns in subnet.get('dns_nameservers', []):
subnet_object.add_dns(
network_model.IP(address=dns, type='dns'))
# TODO(gongysh) get the routes for this subnet
subnets.append(subnet_object)
return subnets
def get_dns_domains(self, context):
"""Return a list of available dns domains.
These can be used to create DNS entries for floating ips.
"""
raise NotImplemented()
def add_dns_entry(self, context, address, name, dns_type, domain):
"""Create specified DNS entry for address."""
raise NotImplemented()
def modify_dns_entry(self, context, name, address, domain):
"""Create specified DNS entry for address."""
raise NotImplemented()
def delete_dns_entry(self, context, name, domain):
"""Delete the specified dns entry."""
raise NotImplemented()
def delete_dns_domain(self, context, domain):
"""Delete the specified dns domain."""
raise NotImplemented()
def get_dns_entries_by_address(self, context, address, domain):
"""Get entries for address and domain."""
raise NotImplemented()
def get_dns_entries_by_name(self, context, name, domain):
"""Get entries for name and domain."""
raise NotImplemented()
def create_private_dns_domain(self, context, domain, availability_zone):
"""Create a private DNS domain with nova availability zone."""
raise NotImplemented()
def create_public_dns_domain(self, context, domain, project=None):
"""Create a private DNS domain with optional nova project."""
raise NotImplemented()

View File

@@ -291,7 +291,7 @@ class QuantumDeallocationTestCase(QuantumNovaTestCase):
self.net_man.deallocate_ip_address('context', 'net_id', 'project_id',
{'uuid': 1}, 'instance_id')
def test_deallocate_ip_address(self):
def test_deallocate_ip_address_2(self):
ipam = self.mox.CreateMock(melange_ipam_lib.QuantumMelangeIPAMLib)
ipam.get_tenant_id_by_net_id('context', 'net_id', {'uuid': 1},
'project_id').AndRaise(Exception())
@@ -492,7 +492,7 @@ class QuantumManagerTestCase(QuantumNovaTestCase):
net = db.network_get_by_uuid(ctx.elevated(), net_id)
self.assertTrue(net is not None)
self.assertEquals(net['uuid'], net_id)
self.assertTrue(net['host'] is not None)
self.assertTrue(net['host'] != None)
class QuantumNovaMACGenerationTestCase(QuantumNovaTestCase):

View File

@@ -0,0 +1,406 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import mox
from nova import context
from nova import exception
from nova.network import quantumv2
from nova.network import model
from nova.network.quantumv2 import api as quantumapi
from nova.openstack.common import cfg
from nova import test
from nova import utils
from quantumclient.v2_0 import client
FLAGS = cfg.CONF
class MyComparator(mox.Comparator):
def __init__(self, lhs):
self.lhs = lhs
def _com_dict(self, lhs, rhs):
if len(lhs) != len(rhs):
return False
for key, value in lhs.iteritems():
if key not in rhs:
return False
rhs_value = rhs[key]
if not self._com(value, rhs_value):
return False
return True
def _com_list(self, lhs, rhs):
if len(lhs) != len(rhs):
return False
for lhs_value in lhs:
if lhs_value not in rhs:
return False
return True
def _com(self, lhs, rhs):
if lhs is None:
return rhs is None
if isinstance(lhs, dict):
if not isinstance(rhs, dict):
return False
return self._com_dict(lhs, rhs)
if isinstance(lhs, list):
if not isinstance(rhs, list):
return False
return self._com_list(lhs, rhs)
if isinstance(lhs, tuple):
if not isinstance(rhs, tuple):
return False
return self._com_list(lhs, rhs)
return lhs == rhs
def equals(self, rhs):
return self._com(self.lhs, rhs)
def __repr__(self):
return str(self.lhs)
class TestQuantumClient(test.TestCase):
def setUp(self):
super(TestQuantumClient, self).setUp()
def test_withtoken(self):
self.flags(quantum_url='http://anyhost/')
self.flags(quantum_url_timeout=30)
my_context = context.RequestContext('userid',
'my_tenantid',
auth_token='token')
self.mox.StubOutWithMock(client.Client, "__init__")
client.Client.__init__(
endpoint_url=FLAGS.quantum_url,
token=my_context.auth_token,
timeout=FLAGS.quantum_url_timeout).AndReturn(None)
self.mox.ReplayAll()
quantumv2.get_client(my_context)
def test_withouttoken_keystone_connection_error(self):
self.flags(quantum_auth_strategy='keystone')
self.flags(quantum_url='http://anyhost/')
my_context = context.RequestContext('userid', 'my_tenantid')
self.assertRaises(Exception,
quantumv2.get_client,
my_context)
def test_withouttoken_keystone_not_auth(self):
# self.flags(quantum_auth_strategy=None) fail to work
old_quantum_auth_strategy = FLAGS.quantum_auth_strategy
setattr(FLAGS, 'quantum_auth_strategy', None)
self.flags(quantum_url='http://anyhost/')
self.flags(quantum_url_timeout=30)
my_context = context.RequestContext('userid', 'my_tenantid')
self.mox.StubOutWithMock(client.Client, "__init__")
client.Client.__init__(
endpoint_url=FLAGS.quantum_url,
auth_strategy=None,
timeout=FLAGS.quantum_url_timeout).AndReturn(None)
self.mox.ReplayAll()
try:
quantumv2.get_client(my_context)
finally:
setattr(FLAGS, 'quantum_auth_strategy',
old_quantum_auth_strategy)
class TestQuantumv2(test.TestCase):
def setUp(self):
super(TestQuantumv2, self).setUp()
self.mox.StubOutWithMock(quantumv2, 'get_client')
self.moxed_client = self.mox.CreateMock(client.Client)
quantumv2.get_client(mox.IgnoreArg()).MultipleTimes().AndReturn(
self.moxed_client)
self.context = context.RequestContext('userid', 'my_tenantid')
setattr(self.context,
'auth_token',
'bff4a5a6b9eb4ea2a6efec6eefb77936')
self.instance = {'project_id': '9d049e4b60b64716978ab415e6fbd5c0',
'uuid': str(utils.gen_uuid()),
'display_name': 'test_instance'}
self.nets1 = [{'id': 'my_netid1',
'name': 'my_netname1',
'tenant_id': 'my_tenantid'}]
self.nets2 = []
self.nets2.append(self.nets1[0])
self.nets2.append({'id': 'my_netid2',
'name': 'my_netname2',
'tenant_id': 'my_tenantid'})
self.port_data1 = [{'network_id': 'my_netid1',
'device_id': 'device_id1',
'id': 'my_portid1',
'fixed_ips': [{'ip_address': '10.0.1.2',
'subnet_id': 'my_subid1'}],
'mac_address': 'my_mac1', }]
self.port_data2 = []
self.port_data2.append(self.port_data1[0])
self.port_data2.append({'network_id': 'my_netid2',
'device_id': 'device_id2',
'id': 'my_portid2',
'fixed_ips': [{'ip_address': '10.0.2.2',
'subnet_id': 'my_subid2'}],
'mac_address': 'my_mac2', })
self.subnet_data1 = [{'cidr': '10.0.1.0/24',
'gateway_ip': '10.0.1.1',
'dns_nameservers': ['8.8.1.1', '8.8.1.2']}]
self.subnet_data2 = []
self.subnet_data2.append({'cidr': '10.0.2.0/24',
'gateway_ip': '10.0.2.1',
'dns_nameservers': ['8.8.2.1', '8.8.2.2']})
def tearDown(self):
try:
self.mox.UnsetStubs()
self.mox.VerifyAll()
finally:
FLAGS.reset()
def _verify_nw_info(self, nw_inf, index=0):
id_suffix = index + 1
self.assertEquals('10.0.%s.2' % id_suffix,
nw_inf.fixed_ips()[index]['address'])
self.assertEquals('my_netname%s' % id_suffix,
nw_inf[index]['network']['label'])
self.assertEquals('my_portid%s' % id_suffix, nw_inf[index]['id'])
self.assertEquals('my_mac%s' % id_suffix, nw_inf[index]['address'])
self.assertEquals('10.0.%s.0/24' % id_suffix,
nw_inf[index]['network']['subnets'][0]['cidr'])
self.assertTrue(model.IP(address='8.8.%s.1' % id_suffix) in
nw_inf[index]['network']['subnets'][0]['dns'])
def _get_instance_nw_info(self, number):
api = quantumapi.API()
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
api.db.instance_info_cache_update(mox.IgnoreArg(),
self.instance['uuid'],
mox.IgnoreArg())
port_data = number == 1 and self.port_data1 or self.port_data2
self.moxed_client.list_ports(
tenant_id=self.instance['project_id'],
device_id=self.instance['uuid']).AndReturn(
{'ports': port_data})
nets = number == 1 and self.nets1 or self.nets2
self.moxed_client.list_networks(
tenant_id=self.instance['project_id']).AndReturn(
{'networks': nets})
for i in xrange(1, number + 1):
subnet_data = i == 1 and self.subnet_data1 or self.subnet_data2
self.moxed_client.list_subnets(
id=mox.SameElementsAs(['my_subid%s' % i])).AndReturn(
{'subnets': subnet_data})
self.mox.ReplayAll()
nw_inf = api.get_instance_nw_info(self.context, self.instance)
for i in xrange(0, number):
self._verify_nw_info(nw_inf, i)
def test_get_instance_nw_info_1(self):
"""Test to get one port in one network and subnet."""
self._get_instance_nw_info(1)
def test_get_instance_nw_info_2(self):
"""Test to get one port in each of two networks and subnets."""
self._get_instance_nw_info(2)
def test_get_instance_nw_info_with_nets(self):
"""Test get instance_nw_info with networks passed in."""
api = quantumapi.API()
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
api.db.instance_info_cache_update(
mox.IgnoreArg(),
self.instance['uuid'], mox.IgnoreArg())
self.moxed_client.list_ports(
tenant_id=self.instance['project_id'],
device_id=self.instance['uuid']).AndReturn(
{'ports': self.port_data1})
self.moxed_client.list_subnets(
id=mox.SameElementsAs(['my_subid1'])).AndReturn(
{'subnets': self.subnet_data1})
self.mox.ReplayAll()
nw_inf = api.get_instance_nw_info(self.context,
self.instance,
networks=self.nets1)
self._verify_nw_info(nw_inf, 0)
def _allocate_for_instance(self, number):
api = quantumapi.API()
self.mox.StubOutWithMock(api, 'get_instance_nw_info')
nets = number == 1 and self.nets1 or self.nets2
api.get_instance_nw_info(mox.IgnoreArg(),
self.instance,
networks=nets).AndReturn(None)
self.moxed_client.list_networks(
tenant_id=self.instance['project_id']).AndReturn(
{'networks': nets})
for network in nets:
port_req_body = {
'port': {
'network_id': network['id'],
'admin_state_up': True,
'device_id': self.instance['uuid'],
'tenant_id': self.instance['project_id'],
},
}
port = {'id': 'portid_' + network['id']}
self.moxed_client.create_port(
MyComparator(port_req_body)).AndReturn({'port': port})
self.mox.ReplayAll()
api.allocate_for_instance(self.context, self.instance)
def test_allocate_for_instance_1(self):
"""Allocate one port in one network env."""
self._allocate_for_instance(1)
def test_allocate_for_instance_2(self):
"""Allocate one port in two networks env."""
self._allocate_for_instance(2)
def test_allocate_for_instance_ex1(self):
"""verify we will delete created ports
if we fail to allocate all net resources.
Mox to raise exception when creating a second port.
In this case, the code should delete the first created port.
"""
api = quantumapi.API()
self.moxed_client.list_networks(
tenant_id=self.instance['project_id']).AndReturn(
{'networks': self.nets2})
index = 0
for network in self.nets2:
port_req_body = {
'port': {
'network_id': network['id'],
'admin_state_up': True,
'device_id': self.instance['uuid'],
'tenant_id': self.instance['project_id'],
},
}
port = {'id': 'portid_' + network['id']}
if index == 0:
self.moxed_client.create_port(
MyComparator(port_req_body)).AndReturn({'port': port})
else:
self.moxed_client.create_port(
MyComparator(port_req_body)).AndRaise(
Exception("fail to create port"))
index += 1
self.moxed_client.delete_port('portid_' + self.nets2[0]['id'])
self.mox.ReplayAll()
self.assertRaises(Exception, api.allocate_for_instance,
self.context, self.instance)
def test_allocate_for_instance_ex2(self):
"""verify we have no port to delete
if we fail to allocate the first net resource.
Mox to raise exception when creating the first port.
In this case, the code should not delete any ports.
"""
api = quantumapi.API()
self.moxed_client.list_networks(
tenant_id=self.instance['project_id']).AndReturn(
{'networks': self.nets2})
port_req_body = {
'port': {
'network_id': self.nets2[0]['id'],
'admin_state_up': True,
'device_id': self.instance['uuid'],
'tenant_id': self.instance['project_id'],
},
}
self.moxed_client.create_port(
MyComparator(port_req_body)).AndRaise(
Exception("fail to create port"))
self.mox.ReplayAll()
self.assertRaises(Exception, api.allocate_for_instance,
self.context, self.instance)
def _deallocate_for_instance(self, number):
port_data = number == 1 and self.port_data1 or self.port_data2
self.moxed_client.list_ports(
device_id=self.instance['uuid']).AndReturn(
{'ports': port_data})
for port in port_data:
self.moxed_client.delete_port(port['id'])
self.mox.ReplayAll()
api = quantumapi.API()
api.deallocate_for_instance(self.context, self.instance)
def test_deallocate_for_instance_1(self):
"""Test to deallocate in one port env."""
self._deallocate_for_instance(1)
def test_deallocate_for_instance_2(self):
"""Test to deallocate in two ports env."""
self._deallocate_for_instance(2)
def test_validate_networks(self):
requested_networks = [('my_netid1', 'test'), ('my_netid2', 'test2')]
self.moxed_client.list_networks(
id=mox.SameElementsAs(['my_netid1', 'my_netid2']),
tenant_id=self.context.project_id).AndReturn(
{'networks': self.nets2})
self.mox.ReplayAll()
api = quantumapi.API()
api.validate_networks(self.context, requested_networks)
def test_validate_networks_ex_1(self):
requested_networks = [('my_netid1', 'test'), ('my_netid2', 'test2')]
self.moxed_client.list_networks(
id=mox.SameElementsAs(['my_netid1', 'my_netid2']),
tenant_id=self.context.project_id).AndReturn(
{'networks': self.nets1})
self.mox.ReplayAll()
api = quantumapi.API()
try:
api.validate_networks(self.context, requested_networks)
except exception.NetworkNotFound as ex:
self.assertTrue("my_netid2" in str(ex))
def test_validate_networks_ex_2(self):
requested_networks = [('my_netid1', 'test'),
('my_netid2', 'test2'),
('my_netid3', 'test3')]
self.moxed_client.list_networks(
id=mox.SameElementsAs(['my_netid1', 'my_netid2', 'my_netid3']),
tenant_id=self.context.project_id).AndReturn(
{'networks': self.nets1})
self.mox.ReplayAll()
api = quantumapi.API()
try:
api.validate_networks(self.context, requested_networks)
except exception.NetworkNotFound as ex:
self.assertTrue("my_netid2, my_netid3" in str(ex))
def test_get_instance_uuids_by_ip_filter(self):
filters = {'ip': '^10\\.0\\.1\\.2$'}
self.moxed_client.list_ports(
fixed_ips=MyComparator({'ip_address': '10.0.1.2'})).AndReturn(
{'ports': self.port_data2})
self.mox.ReplayAll()
api = quantumapi.API()
result = api.get_instance_uuids_by_ip_filter(self.context, filters)
self.assertEquals('device_id1', result[0]['instance_uuid'])
self.assertEquals('device_id2', result[1]['instance_uuid'])

View File

@@ -24,3 +24,4 @@ Babel>=0.9.6
iso8601>=0.1.4
httplib2
setuptools_git>=0.4
python-quantumclient>=0.1,<0.2