From 0a5c5f8130fce42b1edcfb67c702e25f51aefa13 Mon Sep 17 00:00:00 2001 From: Jesse Andrews Date: Sat, 12 Jun 2010 18:24:27 -0700 Subject: [PATCH 01/10] implement image serving in objectstore so nginx isn't required in development reviewed by yosh --- nova/objectstore/handler.py | 26 ++++++++++++++++++++++++++ nova/objectstore/image.py | 4 ++++ 2 files changed, 30 insertions(+) diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index a7fff12fc227..3f00bb0c4fef 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -71,6 +71,7 @@ class Application(web.Application): def __init__(self, user_manager): web.Application.__init__(self, [ (r"/", RootHandler), + (r"/_images/(.+)", ImageDownloadHandler), (r"/_images/", ImageHandler), (r"/([^/]+)/(.+)", ObjectHandler), (r"/([^/]+)/", BucketHandler), @@ -224,6 +225,31 @@ class ObjectHandler(BaseRequestHandler): self.finish() +class ImageDownloadHandler(BaseRequestHandler): + SUPPORTED_METHODS = ("GET", ) + + @catch_nova_exceptions + def get(self, image_id): + """ send the decrypted image file + + streaming content through python is slow and should only be used + in development mode. You should serve files via a web server + in production. + """ + + self.set_header("Content-Type", "application/octet-stream") + + READ_SIZE = 64*1024 + + img = image.Image(image_id) + with open(img.image_path, 'rb') as fp: + s = fp.read(READ_SIZE) + while s: + self.write(s) + s = fp.read(READ_SIZE) + + self.finish() + class ImageHandler(BaseRequestHandler): SUPPORTED_METHODS = ("POST", "PUT", "GET", "DELETE") diff --git a/nova/objectstore/image.py b/nova/objectstore/image.py index 892ada00cd6f..b8dae4077f7a 100644 --- a/nova/objectstore/image.py +++ b/nova/objectstore/image.py @@ -47,6 +47,10 @@ class Image(object): not os.path.isdir(self.path): raise exception.NotFound + @property + def image_path(self): + return os.path.join(self.path, 'image') + def delete(self): for fn in ['info.json', 'image']: try: From aabc316aa734107e82a6dd0317028f9a254f24bc Mon Sep 17 00:00:00 2001 From: Jesse Andrews Date: Tue, 15 Jun 2010 10:42:34 -0700 Subject: [PATCH 02/10] first go at moving from tornado to twisted --- nova/objectstore/handler.py | 169 ++++++++++++++++++++++++------------ 1 file changed, 114 insertions(+), 55 deletions(-) diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 3f00bb0c4fef..d9369afbf855 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -33,18 +33,26 @@ S3 client with this module:: """ import datetime -import os -import urllib -import json import logging +import json import multiprocessing +import os +import re +import time +import urllib from nova import vendor -from tornado import escape, web + +from twisted.web import resource +from twisted.web import server +from twisted.internet import reactor + +from tornado import escape # FIXME(ja): move to non-tornado escape from nova import exception from nova import flags +from nova.auth import users from nova.endpoint import api from nova.objectstore import bucket from nova.objectstore import image @@ -53,54 +61,102 @@ from nova.objectstore import image FLAGS = flags.FLAGS -def catch_nova_exceptions(target): - # FIXME: find a way to wrap all handlers in the web.Application.__init__ ? - def wrapper(*args, **kwargs): - try: - return target(*args, **kwargs) - except exception.NotFound: - raise web.HTTPError(404) - except exception.NotAuthorized: - raise web.HTTPError(403) - - return wrapper - - -class Application(web.Application): +class Application(resource.Resource): """Implementation of an S3-like storage server based on local files.""" - def __init__(self, user_manager): - web.Application.__init__(self, [ - (r"/", RootHandler), + + isLeaf = True + + def __init__(self): + # fixme(ja): optomize by compiling regexps? + self.handlers = [ (r"/_images/(.+)", ImageDownloadHandler), (r"/_images/", ImageHandler), (r"/([^/]+)/(.+)", ObjectHandler), (r"/([^/]+)/", BucketHandler), - ]) + (r"/", RootHandler), + ] self.buckets_path = os.path.abspath(FLAGS.buckets_path) self.images_path = os.path.abspath(FLAGS.images_path) if not os.path.exists(self.buckets_path): - raise Exception("buckets_path does not exist") + raise Exception("buckets_path %s does not exist" % self.buckets_path) if not os.path.exists(self.images_path): - raise Exception("images_path does not exist") - self.user_manager = user_manager + raise Exception("images_path %s does not exist" % self.images_path) + + def render_GET(self, request): + return self.route(request) + + def render_PUT(self, request): + return self.route(request) + + def render_POST(self, request): + return self.route(request) + + def render_DELETE(self, request): + return self.route(request) + + def route(self, request): + start_time = time.time() + + for regexp, handler in self.handlers: + match = re.search(regexp, request.path) + if match: + try: + print 'match: %s' % request.path + func = getattr(handler(request), request.method.lower()) + #print 'func: %s' % func + params = match.groups() + #print 'args: %s' % str(params) + response = func(*params) + #print 'resp: %s' % response + except exception.NotFound: + request.setResponseCode(404) + response = 'Not Found' + except exception.NotAuthorized: + request.setResponseCode(403) + response = 'Not Authorized' + except Exception, e: + request.setResponseCode(500) + response = 'Internal Error: %s' % e + break + + duration = (time.time() - start_time) * 1000 + logging.info("%d %s %s %0.1fms %s" % (request.code, request.method, request.uri, + duration, str(handler).split('.')[-1].split("'")[0])) + return response -class BaseRequestHandler(web.RequestHandler): +class BaseRequestHandler(object): SUPPORTED_METHODS = ("PUT", "GET", "DELETE", "HEAD") + def __init__(self, request): + self.request = request + + def set_header(self, name, value): + self.request.setHeader(name, value) + + def write(self, content): + self.request.write(content) + + def finish(self, content=None): + if content: + self.request.write(content) + self.request.finish() + @property def context(self): if not hasattr(self, '_context'): try: # Authorization Header format: 'AWS :' - access, sep, secret = self.request.headers['Authorization'].split(' ')[1].rpartition(':') - (user, project) = self.application.user_manager.authenticate(access, secret, {}, self.request.method, self.request.host, self.request.path, False) + access, sep, secret = self.request.getHeader('Authorization').split(' ')[1].rpartition(':') + um = users.UserManager.instance() + print 'um %s' % um + (user, project) = um.authenticate(access, secret, {}, self.request.method, self.request.host, self.request.uri, False) # FIXME: check signature here! self._context = api.APIRequestContext(self, user, project) except exception.Error, ex: logging.debug("Authentication Failure: %s" % ex) - raise web.HTTPError(403) + raise exception.NotAuthorized return self._context def render_xml(self, value): @@ -138,6 +194,7 @@ class BaseRequestHandler(web.RequestHandler): class RootHandler(BaseRequestHandler): + def get(self): buckets = [b for b in bucket.Bucket.all() if b.is_authorized(self.context)] @@ -147,14 +204,13 @@ class RootHandler(BaseRequestHandler): class BucketHandler(BaseRequestHandler): - @catch_nova_exceptions def get(self, bucket_name): logging.debug("List keys for bucket %s" % (bucket_name)) bucket_object = bucket.Bucket(bucket_name) if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized prefix = self.get_argument("prefix", u"") marker = self.get_argument("marker", u"") @@ -164,19 +220,21 @@ class BucketHandler(BaseRequestHandler): results = bucket_object.list_keys(prefix=prefix, marker=marker, max_keys=max_keys, terse=terse) self.render_xml({"ListBucketResult": results}) - @catch_nova_exceptions def put(self, bucket_name): logging.debug("Creating bucket %s" % (bucket_name)) + try: + print 'user is %s' % self.context + except Exception, e: + logging.exception(e) bucket.Bucket.create(bucket_name, self.context) self.finish() - @catch_nova_exceptions def delete(self, bucket_name): logging.debug("Deleting bucket %s" % (bucket_name)) bucket_object = bucket.Bucket(bucket_name) if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized bucket_object.delete() self.set_status(204) @@ -184,14 +242,13 @@ class BucketHandler(BaseRequestHandler): class ObjectHandler(BaseRequestHandler): - @catch_nova_exceptions def get(self, bucket_name, object_name): logging.debug("Getting object: %s / %s" % (bucket_name, object_name)) bucket_object = bucket.Bucket(bucket_name) if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized obj = bucket_object[urllib.unquote(object_name)] self.set_header("Content-Type", "application/unknown") @@ -199,26 +256,28 @@ class ObjectHandler(BaseRequestHandler): self.set_header("Etag", '"' + obj.md5 + '"') self.finish(obj.read()) - @catch_nova_exceptions def put(self, bucket_name, object_name): logging.debug("Putting object: %s / %s" % (bucket_name, object_name)) bucket_object = bucket.Bucket(bucket_name) if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized key = urllib.unquote(object_name) - bucket_object[key] = self.request.body + print 'seeking' + self.request.content.seek(0, 0) + print 'writing' + bucket_object[key] = self.request.content.read() + print 'etag %s' % bucket_object[key].md5 self.set_header("Etag", '"' + bucket_object[key].md5 + '"') self.finish() - @catch_nova_exceptions def delete(self, bucket_name, object_name): logging.debug("Deleting object: %s / %s" % (bucket_name, object_name)) bucket_object = bucket.Bucket(bucket_name) if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized del bucket_object[urllib.unquote(object_name)] self.set_status(204) @@ -228,7 +287,6 @@ class ObjectHandler(BaseRequestHandler): class ImageDownloadHandler(BaseRequestHandler): SUPPORTED_METHODS = ("GET", ) - @catch_nova_exceptions def get(self, image_id): """ send the decrypted image file @@ -239,21 +297,21 @@ class ImageDownloadHandler(BaseRequestHandler): self.set_header("Content-Type", "application/octet-stream") - READ_SIZE = 64*1024 + READ_SIZE = 1024*1024 img = image.Image(image_id) with open(img.image_path, 'rb') as fp: - s = fp.read(READ_SIZE) - while s: - self.write(s) - s = fp.read(READ_SIZE) + chunk = fp.read(READ_SIZE) + while chunk: + self.write(chunk) + self.flush() + chunk = fp.read(READ_SIZE) self.finish() class ImageHandler(BaseRequestHandler): SUPPORTED_METHODS = ("POST", "PUT", "GET", "DELETE") - @catch_nova_exceptions def get(self): """ returns a json listing of all images that a user has permissions to see """ @@ -262,7 +320,6 @@ class ImageHandler(BaseRequestHandler): self.finish(json.dumps([i.metadata for i in images])) - @catch_nova_exceptions def put(self): """ create a new registered image """ @@ -272,20 +329,19 @@ class ImageHandler(BaseRequestHandler): image_path = os.path.join(FLAGS.images_path, image_id) if not image_path.startswith(FLAGS.images_path) or \ os.path.exists(image_path): - raise web.HTTPError(403) + raise exception.NotAuthorized bucket_object = bucket.Bucket(image_location.split("/")[0]) manifest = image_location[len(image_location.split('/')[0])+1:] if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized p = multiprocessing.Process(target=image.Image.create,args= (image_id, image_location, self.context)) p.start() self.finish() - @catch_nova_exceptions def post(self): """ update image attributes: public/private """ @@ -295,21 +351,24 @@ class ImageHandler(BaseRequestHandler): image_object = image.Image(image_id) if not image.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized image_object.set_public(operation=='add') self.finish() - @catch_nova_exceptions def delete(self): """ delete a registered image """ image_id = self.get_argument("image_id", u"") image_object = image.Image(image_id) if not image.is_authorized(self.context): - raise web.HTTPError(403) + raise exception.NotAuthorized image_object.delete() self.set_status(204) + +factory = server.Site(Application()) +reactor.listenTCP(3333, factory) +reactor.run() From b81b0f2ecf3ef9bcba71a581ccd0ed3729398fba Mon Sep 17 00:00:00 2001 From: Jesse Andrews Date: Sun, 20 Jun 2010 15:08:25 -0700 Subject: [PATCH 03/10] update spacing --- docs/getting.started.rst | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/getting.started.rst b/docs/getting.started.rst index 9d7808a27093..63e4359065cb 100644 --- a/docs/getting.started.rst +++ b/docs/getting.started.rst @@ -1,12 +1,12 @@ .. Copyright [2010] [Anso Labs, LLC] - + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -67,19 +67,19 @@ Installation # ON THE CLOUD CONTROLLER apt-get install -y rabbitmq-server dnsmasq nginx # build redis from 2.0.0-rc1 source - # setup ldap (slap.sh as root will remove ldap and reinstall it) - NOVA_PATH/nova/auth/slap.sh + # setup ldap (slap.sh as root will remove ldap and reinstall it) + NOVA_PATH/nova/auth/slap.sh /etc/init.d/rabbitmq-server start # ON VOLUME NODE: - apt-get install -y vblade-persist + apt-get install -y vblade-persist # ON THE COMPUTE NODE: apt-get install -y kpartx kvm # optional packages - apt-get install -y euca2ools - + apt-get install -y euca2ools + Configuration --------------- @@ -90,10 +90,10 @@ ON CLOUD CONTROLLER :: iptables -t nat -A PREROUTING -s 0.0.0.0/0 -d 169.254.169.254/32 -p tcp -m tcp --dport 80 -j DNAT --to-destination $IP:8773 - iptables --table nat --append POSTROUTING --out-interface $PUBLICIFACE -j MASQUERADE + iptables --table nat --append POSTROUTING --out-interface $PUBLICIFACE -j MASQUERADE -* Configure NginX proxy (/etc/nginx/sites-enabled/default) +* Configure NginX proxy (/etc/nginx/sites-enabled/default) :: @@ -137,8 +137,8 @@ Launch servers Launch nova components -* api_worker -* s3_worker -* node_worker -* storage_worker +* nova-api +* nova-compute +* nova-objectstore +* nova-volume From c3505507a5b4c0d88164e8e6dfea405c902004ff Mon Sep 17 00:00:00 2001 From: Jay Pipes Date: Fri, 16 Jul 2010 10:03:22 -0500 Subject: [PATCH 04/10] Adds a flag to redirect STDERR when running run_tests.py. Defaults to a truncate-on-write logfile named run_tests.err.log. Adds ignore rule for generated errlog file. --- .bzrignore | 1 + run_tests.py | 9 +++++++++ 2 files changed, 10 insertions(+) create mode 100644 .bzrignore diff --git a/.bzrignore b/.bzrignore new file mode 100644 index 000000000000..93fc868a3a68 --- /dev/null +++ b/.bzrignore @@ -0,0 +1 @@ +run_tests.err.log diff --git a/run_tests.py b/run_tests.py index bd1587d439ab..53d4d7ebe591 100644 --- a/run_tests.py +++ b/run_tests.py @@ -39,6 +39,7 @@ Due to our use of multiprocessing it we frequently get some ignorable """ import __main__ +import os import sys from nova import vendor @@ -66,6 +67,9 @@ FLAGS = flags.FLAGS flags.DEFINE_bool('flush_db', True, 'Flush the database before running fake tests') +flags.DEFINE_string('tests_stderr', 'run_tests.err.log', + 'Path to where to pipe STDERR during test runs. Default = "run_tests.err.log"') + if __name__ == '__main__': OptionsClass = twistd.WrapTwistedOptions(trial_script.Options) config = OptionsClass() @@ -85,6 +89,11 @@ if __name__ == '__main__': else: from nova.tests.real_flags import * + # Establish redirect for STDERR + sys.stderr.flush() + err = open(FLAGS.tests_stderr, 'w+', 0) + os.dup2(err.fileno(), sys.stderr.fileno()) + if len(argv) == 1 and len(config['tests']) == 0: # If no tests were specified run the ones imported in this file # NOTE(termie): "tests" is not a flag, just some Trial related stuff From 4b15a647f8153c493fb697eebc4ab17412142d67 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Fri, 16 Jul 2010 13:39:17 -0500 Subject: [PATCH 05/10] Make S3 API handler more idiomatic Twisted Web-y. --- nova/objectstore/handler.py | 385 +++++++++++++++--------------------- 1 file changed, 156 insertions(+), 229 deletions(-) diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 12ff9763b00a..50c56c83c79a 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -1,10 +1,11 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. # -# Copyright 2009 Facebook +# Copyright 2010 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Copyright 2009 Facebook # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -48,8 +49,8 @@ import urllib from nova import vendor -from twisted.web import resource -from twisted.web import server +from twisted.web.resource import Resource +from twisted.web import server, static from twisted.internet import reactor from tornado import escape # FIXME(ja): move to non-tornado escape @@ -63,273 +64,197 @@ from nova.objectstore import image FLAGS = flags.FLAGS +FLAGS.fake_users = True +def render_xml(request, value): + assert isinstance(value, dict) and len(value) == 1 + request.setHeader("Content-Type", "application/xml; charset=UTF-8") -class Application(resource.Resource): + name = value.keys()[0] + request.write('\n') + request.write('<' + escape.utf8(name) + + ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">') + _render_parts(value.values()[0], request.write) + request.write('') + request.finish() + +def finish(request, content=None): + if content: + request.write(content) + request.finish() + +def _render_parts(value, write_cb): + if isinstance(value, basestring): + write_cb(escape.xhtml_escape(value)) + elif isinstance(value, int) or isinstance(value, long): + write_cb(str(value)) + elif isinstance(value, datetime.datetime): + write_cb(value.strftime("%Y-%m-%dT%H:%M:%S.000Z")) + elif isinstance(value, dict): + for name, subvalue in value.iteritems(): + if not isinstance(subvalue, list): + subvalue = [subvalue] + for subsubvalue in subvalue: + write_cb('<' + escape.utf8(name) + '>') + _render_parts(subsubvalue, write_cb) + write_cb('') + else: + raise Exception("Unknown S3 value type %r", value) + +def get_argument(request, key, default_value): + if key in request.args: + return request.args[key][0] + return default_value + +def get_context(request): + try: + # Authorization Header format: 'AWS :' + access, sep, secret = request.getHeader('Authorization').split(' ')[1].rpartition(':') + um = users.UserManager.instance() + print 'um %s' % um + (user, project) = um.authenticate(access, secret, {}, request.method, request.host, request.uri, False) + # FIXME: check signature here! + return api.APIRequestContext(None, user, project) + except exception.Error, ex: + logging.debug("Authentication Failure: %s" % ex) + raise exception.NotAuthorized + +class S3(Resource): """Implementation of an S3-like storage server based on local files.""" + def getChild(self, name, request): + request.context = get_context(request) - isLeaf = True - - def __init__(self): - # fixme(ja): optomize by compiling regexps? - self.handlers = [ - (r"/_images/(.+)", ImageDownloadHandler), - (r"/_images/", ImageHandler), - (r"/([^/]+)/(.+)", ObjectHandler), - (r"/([^/]+)/", BucketHandler), - (r"/", RootHandler), - ] - self.buckets_path = os.path.abspath(FLAGS.buckets_path) - self.images_path = os.path.abspath(FLAGS.images_path) - - if not os.path.exists(self.buckets_path): - raise Exception("buckets_path %s does not exist" % self.buckets_path) - if not os.path.exists(self.images_path): - raise Exception("images_path %s does not exist" % self.images_path) + if name == '': + return self + elif name == '_images': + return ImageResource() + else: + return BucketResource(name) def render_GET(self, request): - return self.route(request) + buckets = [b for b in bucket.Bucket.all() if b.is_authorized(request.context)] - def render_PUT(self, request): - return self.route(request) - - def render_POST(self, request): - return self.route(request) - - def render_DELETE(self, request): - return self.route(request) - - def route(self, request): - start_time = time.time() - - for regexp, handler in self.handlers: - match = re.search(regexp, request.path) - if match: - try: - print 'match: %s' % request.path - func = getattr(handler(request), request.method.lower()) - #print 'func: %s' % func - params = match.groups() - #print 'args: %s' % str(params) - response = func(*params) - #print 'resp: %s' % response - except exception.NotFound: - request.setResponseCode(404) - response = 'Not Found' - except exception.NotAuthorized: - request.setResponseCode(403) - response = 'Not Authorized' - except Exception, e: - request.setResponseCode(500) - response = 'Internal Error: %s' % e - break - - duration = (time.time() - start_time) * 1000 - logging.info("%d %s %s %0.1fms %s" % (request.code, request.method, request.uri, - duration, str(handler).split('.')[-1].split("'")[0])) - return response - - -class BaseRequestHandler(object): - SUPPORTED_METHODS = ("PUT", "GET", "DELETE", "HEAD") - - def __init__(self, request): - self.request = request - - def set_header(self, name, value): - self.request.setHeader(name, value) - - def write(self, content): - self.request.write(content) - - def finish(self, content=None): - if content: - self.request.write(content) - self.request.finish() - - @property - def context(self): - if not hasattr(self, '_context'): - try: - # Authorization Header format: 'AWS :' - access, sep, secret = self.request.getHeader('Authorization').split(' ')[1].rpartition(':') - um = users.UserManager.instance() - print 'um %s' % um - (user, project) = um.authenticate(access, secret, {}, self.request.method, self.request.host, self.request.uri, False) - # FIXME: check signature here! - self._context = api.APIRequestContext(self, user, project) - except exception.Error, ex: - logging.debug("Authentication Failure: %s" % ex) - raise exception.NotAuthorized - return self._context - - def render_xml(self, value): - assert isinstance(value, dict) and len(value) == 1 - self.set_header("Content-Type", "application/xml; charset=UTF-8") - name = value.keys()[0] - parts = [] - parts.append('<' + escape.utf8(name) + - ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">') - self._render_parts(value.values()[0], parts) - parts.append('') - self.finish('\n' + - ''.join(parts)) - - def _render_parts(self, value, parts=[]): - if isinstance(value, basestring): - parts.append(escape.xhtml_escape(value)) - elif isinstance(value, int) or isinstance(value, long): - parts.append(str(value)) - elif isinstance(value, datetime.datetime): - parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z")) - elif isinstance(value, dict): - for name, subvalue in value.iteritems(): - if not isinstance(subvalue, list): - subvalue = [subvalue] - for subsubvalue in subvalue: - parts.append('<' + escape.utf8(name) + '>') - self._render_parts(subsubvalue, parts) - parts.append('') - else: - raise Exception("Unknown S3 value type %r", value) - - def head(self, *args, **kwargs): - return self.get(*args, **kwargs) - - -class RootHandler(BaseRequestHandler): - - def get(self): - buckets = [b for b in bucket.Bucket.all() if b.is_authorized(self.context)] - - self.render_xml({"ListAllMyBucketsResult": { + render_xml(request, {"ListAllMyBucketsResult": { "Buckets": {"Bucket": [b.metadata for b in buckets]}, }}) + return server.NOT_DONE_YET +class BucketResource(Resource): + def __init__(self, name): + Resource.__init__(self) + self.name = name -class BucketHandler(BaseRequestHandler): - def get(self, bucket_name): - logging.debug("List keys for bucket %s" % (bucket_name)) + def getChild(self, name, request): + if name == '': + return self + else: + return ObjectResource(bucket.Bucket(self.name), name) - bucket_object = bucket.Bucket(bucket_name) + def render_GET(self, request): + logging.debug("List keys for bucket %s" % (self.name)) - if not bucket_object.is_authorized(self.context): + bucket_object = bucket.Bucket(self.name) + + if not bucket_object.is_authorized(request.context): raise exception.NotAuthorized - prefix = self.get_argument("prefix", u"") - marker = self.get_argument("marker", u"") - max_keys = int(self.get_argument("max-keys", 1000)) - terse = int(self.get_argument("terse", 0)) + prefix = get_argument(request, "prefix", u"") + marker = get_argument(request, "marker", u"") + max_keys = int(get_argument(request, "max-keys", 1000)) + terse = int(get_argument(request, "terse", 0)) results = bucket_object.list_keys(prefix=prefix, marker=marker, max_keys=max_keys, terse=terse) - self.render_xml({"ListBucketResult": results}) + render_xml(request, {"ListBucketResult": results}) + return server.NOT_DONE_YET - def put(self, bucket_name): - logging.debug("Creating bucket %s" % (bucket_name)) + def render_PUT(self, request): + logging.debug("Creating bucket %s" % (self.name)) try: - print 'user is %s' % self.context + print 'user is %s' % request.context except Exception, e: logging.exception(e) - bucket.Bucket.create(bucket_name, self.context) - self.finish() + logging.debug("calling bucket.Bucket.create(%r, %r)" % (self.name, request.context)) + bucket.Bucket.create(self.name, request.context) + return '' - def delete(self, bucket_name): - logging.debug("Deleting bucket %s" % (bucket_name)) - bucket_object = bucket.Bucket(bucket_name) + def render_DELETE(self, request): + logging.debug("Deleting bucket %s" % (self.name)) + bucket_object = bucket.Bucket(self.name) - if not bucket_object.is_authorized(self.context): + if not bucket_object.is_authorized(request.context): raise exception.NotAuthorized bucket_object.delete() - self.set_status(204) - self.finish() + request.setResponseCode(204) + return '' -class ObjectHandler(BaseRequestHandler): - def get(self, bucket_name, object_name): - logging.debug("Getting object: %s / %s" % (bucket_name, object_name)) +class ObjectResource(Resource): + def __init__(self, bucket, name): + Resource.__init__(self) + self.bucket = bucket + self.name = name - bucket_object = bucket.Bucket(bucket_name) + def render_GET(self, request): + logging.debug("Getting object: %s / %s" % (self.bucket.name, self.name)) - if not bucket_object.is_authorized(self.context): + if not self.bucket.is_authorized(request.context): raise exception.NotAuthorized - obj = bucket_object[urllib.unquote(object_name)] - self.set_header("Content-Type", "application/unknown") - self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(obj.mtime)) - self.set_header("Etag", '"' + obj.md5 + '"') - self.finish(obj.read()) + obj = self.bucket[urllib.unquote(self.name)] + request.setHeader("Content-Type", "application/unknown") + request.setHeader("Last-Modified", datetime.datetime.utcfromtimestamp(obj.mtime)) + request.setHeader("Etag", '"' + obj.md5 + '"') + return static.File(obj.path).render_GET(request) - def put(self, bucket_name, object_name): - logging.debug("Putting object: %s / %s" % (bucket_name, object_name)) - bucket_object = bucket.Bucket(bucket_name) + def render_PUT(self, request): + logging.debug("Putting object: %s / %s" % (self.bucket.name, self.name)) - if not bucket_object.is_authorized(self.context): + if not self.bucket.is_authorized(request.context): raise exception.NotAuthorized - key = urllib.unquote(object_name) - print 'seeking' - self.request.content.seek(0, 0) - print 'writing' - bucket_object[key] = self.request.content.read() - print 'etag %s' % bucket_object[key].md5 - self.set_header("Etag", '"' + bucket_object[key].md5 + '"') - self.finish() + key = urllib.unquote(self.name) + request.content.seek(0, 0) + self.bucket[key] = request.content.read() + request.setHeader("Etag", '"' + self.bucket[key].md5 + '"') + finish(request) + return server.NOT_DONE_YET - def delete(self, bucket_name, object_name): - logging.debug("Deleting object: %s / %s" % (bucket_name, object_name)) - bucket_object = bucket.Bucket(bucket_name) + def render_DELETE(self, request): + logging.debug("Deleting object: %s / %s" % (self.bucket.name, self.name)) - if not bucket_object.is_authorized(self.context): + if not self.bucket.is_authorized(request.context): raise exception.NotAuthorized - del bucket_object[urllib.unquote(object_name)] - self.set_status(204) - self.finish() + del self.bucket[urllib.unquote(self.name)] + request.setResponseCode(204) + return '' +class ImageResource(Resource): + isLeaf = True -class ImageDownloadHandler(BaseRequestHandler): - SUPPORTED_METHODS = ("GET", ) + def getChild(self, name, request): + if name == '': + return self + else: + request.setHeader("Content-Type", "application/octet-stream") + img = image.Image(name) + return static.File(img.image_path) - @catch_nova_exceptions - def get(self, image_id): - """ send the decrypted image file - - streaming content through python is slow and should only be used - in development mode. You should serve files via a web server - in production. - """ - - self.set_header("Content-Type", "application/octet-stream") - - READ_SIZE = 1024*1024 - - img = image.Image(image_id) - with open(img.image_path, 'rb') as fp: - chunk = fp.read(READ_SIZE) - while chunk: - self.write(chunk) - self.flush() - chunk = fp.read(READ_SIZE) - - self.finish() - -class ImageHandler(BaseRequestHandler): - SUPPORTED_METHODS = ("POST", "PUT", "GET", "DELETE") - - def get(self): + def render_GET(self, request): """ returns a json listing of all images that a user has permissions to see """ images = [i for i in image.Image.all() if i.is_authorized(self.context)] - self.finish(json.dumps([i.metadata for i in images])) + request.write(json.dumps([i.metadata for i in images])) + return server.NOT_DONE_YET - def put(self): + def render_PUT(self, request): """ create a new registered image """ - image_id = self.get_argument('image_id', u'') - image_location = self.get_argument('image_location', u'') + image_id = get_argument(request, 'image_id', u'') + image_location = get_argument(request, 'image_location', u'') image_path = os.path.join(FLAGS.images_path, image_id) if not image_path.startswith(FLAGS.images_path) or \ @@ -345,9 +270,9 @@ class ImageHandler(BaseRequestHandler): p = multiprocessing.Process(target=image.Image.register_aws_image, args=(image_id, image_location, self.context)) p.start() - self.finish() + return '' - def post(self): + def render_POST(self): """ update image attributes: public/private """ image_id = self.get_argument('image_id', u'') @@ -360,9 +285,9 @@ class ImageHandler(BaseRequestHandler): image_object.set_public(operation=='add') - self.finish() + return '' - def delete(self): + def render_DELETE(self): """ delete a registered image """ image_id = self.get_argument("image_id", u"") image_object = image.Image(image_id) @@ -372,8 +297,10 @@ class ImageHandler(BaseRequestHandler): image_object.delete() - self.set_status(204) + request.setResponseCode(204) + return '' -factory = server.Site(Application()) +root = S3() +factory = server.Site(root) reactor.listenTCP(3333, factory) reactor.run() From 6892ff871b1a154bbe669bf5cb10eab638fb181b Mon Sep 17 00:00:00 2001 From: Joshua McKenty Date: Fri, 16 Jul 2010 14:02:37 -0700 Subject: [PATCH 06/10] Ack messages during call so rabbit leaks less. --- nova/rpc.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nova/rpc.py b/nova/rpc.py index 72a84b7f7583..99e820ff3dc9 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -197,7 +197,10 @@ def call(topic, msg): conn = Connection.instance() d = defer.Deferred() consumer = DirectConsumer(connection=conn, msg_id=msg_id) - consumer.register_callback(lambda data, message: d.callback(data)) + def deferred_receive(data, message): + message.ack() + d.callback(data) + consumer.register_callback(deferred_receive) injected = consumer.attach_to_tornado() # clean up after the injected listened and return x From f6aeb0a121e76aefa8b6af6ae602df76c2419b2e Mon Sep 17 00:00:00 2001 From: Joshua McKenty Date: Fri, 16 Jul 2010 14:07:57 -0700 Subject: [PATCH 07/10] Makin the queues non-durable by default --- nova/rpc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nova/rpc.py b/nova/rpc.py index 99e820ff3dc9..c6ebb1160a95 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -112,6 +112,7 @@ class TopicConsumer(Consumer): self.queue = topic self.routing_key = topic self.exchange = FLAGS.control_exchange + self.durable = False super(TopicConsumer, self).__init__(connection=connection) @@ -238,7 +239,8 @@ def send_message(topic, message, wait=True): exchange=msg_id, auto_delete=True, exchange_type="direct", - routing_key=msg_id) + routing_key=msg_id, + durable=False) consumer.register_callback(generic_response) publisher = messaging.Publisher(connection=Connection.instance(), From 9e023095e303f096ac0d2914ed427d2a37d1444d Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Sat, 17 Jul 2010 23:00:53 -0700 Subject: [PATCH 08/10] Replace nova-objectstore with a twistd style wrapper. Add a get_application method to objectstore handler. --- bin/nova-objectstore | 25 +++++++++++-------------- nova/objectstore/handler.py | 29 ++++++++++++++--------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/bin/nova-objectstore b/bin/nova-objectstore index 7876864c0f7d..5ac911f092a1 100755 --- a/bin/nova-objectstore +++ b/bin/nova-objectstore @@ -18,35 +18,32 @@ # under the License. """ - Tornado daemon for nova objectstore. Supports S3 API. + Twisted daemon for nova objectstore. Supports S3 API. """ import logging -from nova import vendor -from tornado import httpserver -from tornado import ioloop - from nova import flags -from nova import server from nova import utils -from nova.auth import users +from nova import twistd from nova.objectstore import handler FLAGS = flags.FLAGS -def main(argv): +def main(): # FIXME: if this log statement isn't here, no logging # appears from other files and app won't start daemonized logging.debug('Started HTTP server on %s' % (FLAGS.s3_internal_port)) - app = handler.Application(users.UserManager()) - server = httpserver.HTTPServer(app) - server.listen(FLAGS.s3_internal_port) - ioloop.IOLoop.instance().start() - + app = handler.get_application() + print app + return app +# NOTE(soren): Stolen from nova-compute if __name__ == '__main__': + twistd.serve(__file__) + +if __name__ == '__builtin__': utils.default_flagfile() - server.serve('nova-objectstore', main) + application = main() diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 50c56c83c79a..63ed34f2d58d 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -42,13 +42,10 @@ import logging import json import multiprocessing import os -import re import time import urllib - -from nova import vendor - +from twisted.application import internet, service from twisted.web.resource import Resource from twisted.web import server, static from twisted.internet import reactor @@ -64,7 +61,6 @@ from nova.objectstore import image FLAGS = flags.FLAGS -FLAGS.fake_users = True def render_xml(request, value): assert isinstance(value, dict) and len(value) == 1 @@ -264,15 +260,15 @@ class ImageResource(Resource): bucket_object = bucket.Bucket(image_location.split("/")[0]) manifest = image_location[len(image_location.split('/')[0])+1:] - if not bucket_object.is_authorized(self.context): + if not bucket_object.is_authorized(request.context): raise exception.NotAuthorized p = multiprocessing.Process(target=image.Image.register_aws_image, - args=(image_id, image_location, self.context)) + args=(image_id, image_location, request.context)) p.start() return '' - def render_POST(self): + def render_POST(self, request): """ update image attributes: public/private """ image_id = self.get_argument('image_id', u'') @@ -280,19 +276,19 @@ class ImageResource(Resource): image_object = image.Image(image_id) - if not image.is_authorized(self.context): + if not image.is_authorized(request.context): raise exception.NotAuthorized image_object.set_public(operation=='add') return '' - def render_DELETE(self): + def render_DELETE(self, request): """ delete a registered image """ image_id = self.get_argument("image_id", u"") image_object = image.Image(image_id) - if not image.is_authorized(self.context): + if not image.is_authorized(request.context): raise exception.NotAuthorized image_object.delete() @@ -300,7 +296,10 @@ class ImageResource(Resource): request.setResponseCode(204) return '' -root = S3() -factory = server.Site(root) -reactor.listenTCP(3333, factory) -reactor.run() +def get_application(): + root = S3() + factory = server.Site(root) + application = service.Application("objectstore") + objectStoreService = internet.TCPServer(FLAGS.s3_port, factory) + objectStoreService.setServiceParent(application) + return application From 2d5124c3f2c6e4e78dc09eb8f38cb125641b9b1c Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Sat, 17 Jul 2010 23:04:46 -0700 Subject: [PATCH 09/10] Remove s3_internal_port setting. Objectstore should be able to handle the beatings now. As such, nginx is no longer needed, so it's removed from the dependencies and the configuration files are removed. --- bin/nova-objectstore | 2 +- debian/control | 2 +- debian/nova-objectstore.install | 1 - debian/nova-objectstore.links | 1 - debian/nova-objectstore.nginx.conf | 17 ----------------- nova/flags.py | 1 - 6 files changed, 2 insertions(+), 22 deletions(-) delete mode 100644 debian/nova-objectstore.links delete mode 100644 debian/nova-objectstore.nginx.conf diff --git a/bin/nova-objectstore b/bin/nova-objectstore index 5ac911f092a1..9385fd299bcd 100755 --- a/bin/nova-objectstore +++ b/bin/nova-objectstore @@ -35,7 +35,7 @@ FLAGS = flags.FLAGS def main(): # FIXME: if this log statement isn't here, no logging # appears from other files and app won't start daemonized - logging.debug('Started HTTP server on %s' % (FLAGS.s3_internal_port)) + logging.debug('Started HTTP server on %s' % (FLAGS.s3_port)) app = handler.get_application() print app return app diff --git a/debian/control b/debian/control index 17414bb7a958..a6d12f36eb80 100644 --- a/debian/control +++ b/debian/control @@ -91,7 +91,7 @@ Description: Nova Cloud Computing - API frontend Package: nova-objectstore Architecture: all -Depends: nova-common (= ${binary:Version}), nginx, ${python:Depends}, ${misc:Depends} +Depends: nova-common (= ${binary:Version}), ${python:Depends}, ${misc:Depends} Description: Nova Cloud Computing - object store Nova is a cloud computing fabric controller (the main part of an IaaS system) built to match the popular AWS EC2 and S3 APIs. It is written in diff --git a/debian/nova-objectstore.install b/debian/nova-objectstore.install index 3ed93ff37b4a..c5b3d997a8f1 100644 --- a/debian/nova-objectstore.install +++ b/debian/nova-objectstore.install @@ -1,3 +1,2 @@ bin/nova-objectstore usr/bin debian/nova-objectstore.conf etc/nova -debian/nova-objectstore.nginx.conf etc/nginx/sites-available diff --git a/debian/nova-objectstore.links b/debian/nova-objectstore.links deleted file mode 100644 index 38e33948e22a..000000000000 --- a/debian/nova-objectstore.links +++ /dev/null @@ -1 +0,0 @@ -/etc/nginx/sites-available/nova-objectstore.nginx.conf /etc/nginx/sites-enabled/nova-objectstore.nginx.conf diff --git a/debian/nova-objectstore.nginx.conf b/debian/nova-objectstore.nginx.conf deleted file mode 100644 index b63424150c29..000000000000 --- a/debian/nova-objectstore.nginx.conf +++ /dev/null @@ -1,17 +0,0 @@ -server { - listen 3333 default; - server_name localhost; - client_max_body_size 10m; - - access_log /var/log/nginx/localhost.access.log; - - location ~ /_images/.+ { - root /var/lib/nova/images; - rewrite ^/_images/(.*)$ /$1 break; - } - - location / { - proxy_pass http://localhost:3334/; - } -} - diff --git a/nova/flags.py b/nova/flags.py index 22e00a44a2c3..ae8bf98f7332 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -37,7 +37,6 @@ DEFINE_bool = DEFINE_bool # http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39 DEFINE_integer('s3_port', 3333, 's3 port') -DEFINE_integer('s3_internal_port', 3334, 's3 port') DEFINE_string('s3_host', '127.0.0.1', 's3 host') #DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') From d5309eff30b1a826f075b28935de2a4b89eede6e Mon Sep 17 00:00:00 2001 From: Ewan Mellor Date: Sun, 18 Jul 2010 18:02:04 +0100 Subject: [PATCH 10/10] Fixed references to nova.utils that were broken by a change of import statement in the remove-vendor merge. --- nova/compute/linux_net.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nova/compute/linux_net.py b/nova/compute/linux_net.py index 7b6ae693a6ae..00c64d81a3bf 100644 --- a/nova/compute/linux_net.py +++ b/nova/compute/linux_net.py @@ -33,13 +33,13 @@ def execute(cmd): logging.debug("FAKE NET: %s" % cmd) return "fake", 0 else: - return nova.utils.execute(cmd) + return utils.execute(cmd) def runthis(desc, cmd): if FLAGS.fake_network: return execute(cmd) else: - return nova.utils.runthis(desc,cmd) + return utils.runthis(desc,cmd) def Popen(cmd): if FLAGS.fake_network: