Add --before to nova-manage db archive_deleted_rows

Add a parameter to limit the archival of deleted rows by date. That is,
only rows related to instances deleted before provided date will be
archived.

This option works together with --max_rows, if both are specified both
will take effect.

Closes-Bug: #1751192
Change-Id: I408c22d8eada0518ec5d685213f250e8e3dae76e
Implements: blueprint nova-archive-before
This commit is contained in:
Jake Yip
2018-02-20 16:14:10 +11:00
parent b301f95ab7
commit e822360b66
7 changed files with 204 additions and 73 deletions

View File

@@ -58,7 +58,7 @@ Nova Database
determined by ``[database]/connection`` in the configuration file passed to
nova-manage.
``nova-manage db archive_deleted_rows [--max_rows <number>] [--verbose] [--until-complete] [--purge]``
``nova-manage db archive_deleted_rows [--max_rows <number>] [--verbose] [--until-complete] [--before <date>] [--purge]``
Move deleted rows from production tables to shadow tables. Note that the
corresponding rows in the ``instance_mappings``, ``request_specs`` and
``instance_group_member`` tables of the API database are purged when
@@ -68,7 +68,9 @@ Nova Database
``--until-complete`` will make the command run continuously until all
deleted rows are archived. Use the ``--max_rows`` option, which defaults to
1000, as a batch size for each iteration (note that the purged API database
table records are not included in this batch size). Specifying ``--purge``
table records are not included in this batch size). Specifying ``--before``
will archive only instances that were deleted before the date provided, and
records in other tables related to those instances. Specifying ``--purge``
will cause a `full` DB purge to be completed after archival. If a date
range is desired for the purge, then run ``nova-manage db purge --before
<date>`` manually after archiving is complete.

View File

@@ -487,6 +487,9 @@ Error: %s""") % six.text_type(e))
'Note that this number does not include the corresponding '
'rows, if any, that are removed from the API database for '
'deleted instances.')
@args('--before', metavar='<date>',
help=('Archive rows that have been deleted before this date'
'(YYYY-MM-DD)'))
@args('--verbose', action='store_true', dest='verbose', default=False,
help='Print how many rows were archived per table.')
@args('--until-complete', action='store_true', dest='until_complete',
@@ -496,13 +499,15 @@ Error: %s""") % six.text_type(e))
@args('--purge', action='store_true', dest='purge', default=False,
help='Purge all data from shadow tables after archive completes')
def archive_deleted_rows(self, max_rows=1000, verbose=False,
until_complete=False, purge=False):
until_complete=False, purge=False,
before=None):
"""Move deleted rows from production tables to shadow tables.
Returns 0 if nothing was archived, 1 if some number of rows were
archived, 2 if max_rows is invalid, 3 if no connection could be
established to the API DB. If automating, this should be
run continuously while the result is 1, stopping at 0.
established to the API DB, 4 if before date is invalid. If automating,
this should be run continuously while the result
is 1, stopping at 0.
"""
max_rows = int(max_rows)
if max_rows < 0:
@@ -526,13 +531,23 @@ Error: %s""") % six.text_type(e))
'command again.'))
return 3
if before:
try:
before_date = dateutil_parser.parse(before, fuzzy=True)
except ValueError as e:
print(_('Invalid value for --before: %s') % e)
return 4
else:
before_date = None
table_to_rows_archived = {}
deleted_instance_uuids = []
if until_complete and verbose:
sys.stdout.write(_('Archiving') + '..') # noqa
while True:
try:
run, deleted_instance_uuids = db.archive_deleted_rows(max_rows)
run, deleted_instance_uuids = db.archive_deleted_rows(
max_rows, before=before_date)
except KeyboardInterrupt:
run = {}
if until_complete and verbose:

View File

@@ -1808,7 +1808,7 @@ def task_log_get(context, task_name, period_beginning,
####################
def archive_deleted_rows(max_rows=None):
def archive_deleted_rows(max_rows=None, before=None):
"""Move up to max_rows rows from production tables to corresponding shadow
tables.
@@ -1824,7 +1824,7 @@ def archive_deleted_rows(max_rows=None):
}
"""
return IMPL.archive_deleted_rows(max_rows=max_rows)
return IMPL.archive_deleted_rows(max_rows=max_rows, before=before)
def pcidevice_online_data_migration(context, max_count):

View File

@@ -5416,7 +5416,7 @@ def task_log_end_task(context, task_name, period_beginning, period_ending,
def _archive_if_instance_deleted(table, shadow_table, instances, conn,
max_rows):
max_rows, before):
"""Look for records that pertain to deleted instances, but may not be
deleted themselves. This catches cases where we delete an instance,
but leave some residue because of a failure in a cleanup path or
@@ -5430,38 +5430,27 @@ def _archive_if_instance_deleted(table, shadow_table, instances, conn,
# instance_actions.id not instances.uuid
if table.name == "instance_actions_events":
instance_actions = models.BASE.metadata.tables["instance_actions"]
query_insert = shadow_table.insert(inline=True).\
from_select(
[c.name for c in table.c],
sql.select(
[table], and_(
instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == instance_actions.c.instance_uuid,
instance_actions.c.id == table.c.action_id)).
order_by(table.c.id).limit(max_rows))
query_select = sql.select(
[table],
and_(instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == instance_actions.c.instance_uuid,
instance_actions.c.id == table.c.action_id))
query_delete = sql.select(
[table.c.id],
and_(instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == instance_actions.c.instance_uuid,
instance_actions.c.id == table.c.action_id)).\
order_by(table.c.id).limit(max_rows)
else:
query_insert = shadow_table.insert(inline=True).\
from_select(
[c.name for c in table.c],
sql.select(
[table], and_(
instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == table.c.instance_uuid)).
order_by(table.c.id).limit(max_rows))
query_select = sql.select(
[table],
and_(instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == table.c.instance_uuid))
query_delete = sql.select(
[table.c.id],
and_(instances.c.deleted != instances.c.deleted.default.arg,
instances.c.uuid == table.c.instance_uuid)).\
order_by(table.c.id).limit(max_rows)
delete_statement = DeleteFromSelect(table, query_delete,
if before:
query_select = query_select.where(instances.c.deleted_at < before)
query_select = query_select.order_by(table.c.id).limit(max_rows)
query_insert = shadow_table.insert(inline=True).\
from_select([c.name for c in table.c], query_select)
delete_statement = DeleteFromSelect(table, query_select,
table.c.id)
try:
@@ -5476,7 +5465,7 @@ def _archive_if_instance_deleted(table, shadow_table, instances, conn,
return 0
def _archive_deleted_rows_for_table(tablename, max_rows):
def _archive_deleted_rows_for_table(tablename, max_rows, before):
"""Move up to max_rows rows from one tables to the corresponding
shadow table.
@@ -5513,8 +5502,11 @@ def _archive_deleted_rows_for_table(tablename, max_rows):
columns = [c.name for c in table.c]
select = sql.select([column],
deleted_column != deleted_column.default.arg).\
order_by(column).limit(max_rows)
deleted_column != deleted_column.default.arg)
if before:
select = select.where(table.c.deleted_at < before)
select = select.order_by(column).limit(max_rows)
rows = conn.execute(select).fetchall()
records = [r[0] for r in rows]
@@ -5554,13 +5546,13 @@ def _archive_deleted_rows_for_table(tablename, max_rows):
instances = models.BASE.metadata.tables['instances']
limit = max_rows - rows_archived if max_rows is not None else None
extra = _archive_if_instance_deleted(table, shadow_table, instances,
conn, limit)
conn, limit, before)
rows_archived += extra
return rows_archived, deleted_instance_uuids
def archive_deleted_rows(max_rows=None):
def archive_deleted_rows(max_rows=None, before=None):
"""Move up to max_rows rows from production tables to the corresponding
shadow tables.
@@ -5590,9 +5582,11 @@ def archive_deleted_rows(max_rows=None):
if (tablename == 'migrate_version' or
tablename.startswith(_SHADOW_TABLE_PREFIX)):
continue
rows_archived,\
deleted_instance_uuid = _archive_deleted_rows_for_table(
tablename, max_rows=max_rows - total_rows_archived)
rows_archived, deleted_instance_uuid = (
_archive_deleted_rows_for_table(
tablename,
max_rows=max_rows - total_rows_archived,
before=before))
total_rows_archived += rows_archived
if tablename == 'instances':
deleted_instance_uuids = deleted_instance_uuid

View File

@@ -21,6 +21,7 @@
import copy
import datetime
from dateutil import parser as dateutil_parser
import iso8601
import mock
import netaddr
@@ -8449,6 +8450,12 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self.instances = models.Instance.__table__
self.shadow_instances = sqlalchemyutils.get_table(
self.engine, "shadow_instances")
self.instance_actions = models.InstanceAction.__table__
self.shadow_instance_actions = sqlalchemyutils.get_table(
self.engine, "shadow_instance_actions")
self.instance_actions_events = models.InstanceActionEvent.__table__
self.shadow_instance_actions_events = sqlalchemyutils.get_table(
self.engine, "shadow_instance_actions_events")
self.migrations = models.Migration.__table__
self.shadow_migrations = sqlalchemyutils.get_table(
self.engine, "shadow_migrations")
@@ -8509,7 +8516,7 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Set 4 to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1)
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
qiim = sql.select([self.instance_id_mappings]).where(self.
instance_id_mappings.c.uuid.in_(self.uuidstrs))
@@ -8557,6 +8564,89 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self._assert_shadow_tables_empty_except(
'shadow_instance_id_mappings')
def test_archive_deleted_rows_before(self):
# Add 6 rows to table
for uuidstr in self.uuidstrs:
ins_stmt = self.instances.insert().values(uuid=uuidstr)
self.conn.execute(ins_stmt)
ins_stmt = self.instance_actions.insert().\
values(instance_uuid=uuidstr)
result = self.conn.execute(ins_stmt)
instance_action_uuid = result.inserted_primary_key[0]
ins_stmt = self.instance_actions_events.insert().\
values(action_id=instance_action_uuid)
self.conn.execute(ins_stmt)
# Set 1 to deleted before 2017-01-01
deleted_at = timeutils.parse_strtime('2017-01-01T00:00:00.0')
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[0:1]))\
.values(deleted=1, deleted_at=deleted_at)
self.conn.execute(update_statement)
# Set 1 to deleted before 2017-01-02
deleted_at = timeutils.parse_strtime('2017-01-02T00:00:00.0')
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[1:2]))\
.values(deleted=1, deleted_at=deleted_at)
self.conn.execute(update_statement)
# Set 2 to deleted now
update_statement = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[2:4]))\
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
qiim = sql.select([self.instances]).where(self.
instances.c.uuid.in_(self.uuidstrs))
qsiim = sql.select([self.shadow_instances]).\
where(self.shadow_instances.c.uuid.in_(self.uuidstrs))
# Verify we have 6 in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 6)
# Make sure 'before' comparison is for < not <=, nothing deleted
before_date = dateutil_parser.parse('2017-01-01', fuzzy=True)
_, uuids = db.archive_deleted_rows(max_rows=1, before=before_date)
self.assertEqual([], uuids)
# Archive rows deleted before 2017-01-02
before_date = dateutil_parser.parse('2017-01-02', fuzzy=True)
results = db.archive_deleted_rows(max_rows=100, before=before_date)
expected = dict(instances=1,
instance_actions=1,
instance_actions_events=1)
self._assertEqualObjects(expected, results[0])
# Archive 1 row deleted before 2017-01-03. instance_action_events
# should be the table with row deleted due to FK contraints
before_date = dateutil_parser.parse('2017-01-03', fuzzy=True)
results = db.archive_deleted_rows(max_rows=1, before=before_date)
expected = dict(instance_actions_events=1)
self._assertEqualObjects(expected, results[0])
# Archive all other rows deleted before 2017-01-03. This should
# delete row in instance_actions, then row in instances due to FK
# constraints
results = db.archive_deleted_rows(max_rows=100, before=before_date)
expected = dict(instances=1, instance_actions=1)
self._assertEqualObjects(expected, results[0])
# Verify we have 4 left in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 4)
# Verify we have 2 in shadow
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 2)
# Archive everything else, make sure default operation without
# before argument didn't break
results = db.archive_deleted_rows(max_rows=1000)
# Verify we have 2 left in main
rows = self.conn.execute(qiim).fetchall()
self.assertEqual(len(rows), 2)
# Verify we have 4 in shadow
rows = self.conn.execute(qsiim).fetchall()
self.assertEqual(len(rows), 4)
def test_archive_deleted_rows_for_every_uuid_table(self):
tablenames = []
for model_class in six.itervalues(models.__dict__):
@@ -8591,7 +8681,7 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Set 4 to deleted
update_statement = main_table.update().\
where(main_table.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1)
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
qmt = sql.select([main_table]).where(main_table.c.uuid.in_(
self.uuidstrs))
@@ -8604,7 +8694,8 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Verify we have 0 in shadow
self.assertEqual(len(rows), 0)
# Archive 2 rows
sqlalchemy_api._archive_deleted_rows_for_table(tablename, max_rows=2)
sqlalchemy_api._archive_deleted_rows_for_table(tablename, max_rows=2,
before=None)
# Verify we have 4 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 4)
@@ -8612,7 +8703,8 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 2)
# Archive 2 more rows
sqlalchemy_api._archive_deleted_rows_for_table(tablename, max_rows=2)
sqlalchemy_api._archive_deleted_rows_for_table(tablename, max_rows=2,
before=None)
# Verify we have 2 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
@@ -8620,7 +8712,8 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
rows = self.conn.execute(qst).fetchall()
self.assertEqual(len(rows), 4)
# Try to archive more, but there are no deleted rows left.
sqlalchemy_api._archive_deleted_rows_for_table(tablename, max_rows=2)
sqlalchemy_api._archive_deleted_rows_for_table(tablename, max_rows=2,
before=None)
# Verify we still have 2 left in main
rows = self.conn.execute(qmt).fetchall()
self.assertEqual(len(rows), 2)
@@ -8635,7 +8728,7 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
self.conn.execute(ins_stmt)
update_statement = self.dns_domains.update().\
where(self.dns_domains.c.domain == uuidstr0).\
values(deleted=True)
values(deleted=True, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
qdd = sql.select([self.dns_domains], self.dns_domains.c.domain ==
uuidstr0)
@@ -8703,24 +8796,29 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
def test_archive_deleted_rows_fk_constraint(self):
# consoles.pool_id depends on console_pools.id
self._check_sqlite_version_less_than_3_7()
ins_stmt = self.console_pools.insert().values(deleted=1)
ins_stmt = self.console_pools.insert().values(deleted=1,
deleted_at=timeutils.utcnow())
result = self.conn.execute(ins_stmt)
id1 = result.inserted_primary_key[0]
ins_stmt = self.consoles.insert().values(deleted=1,
pool_id=id1)
deleted_at=timeutils.utcnow(),
pool_id=id1)
result = self.conn.execute(ins_stmt)
result.inserted_primary_key[0]
# The first try to archive console_pools should fail, due to FK.
num = sqlalchemy_api._archive_deleted_rows_for_table("console_pools",
max_rows=None)
max_rows=None,
before=None)
self.assertEqual(num[0], 0)
# Then archiving consoles should work.
num = sqlalchemy_api._archive_deleted_rows_for_table("consoles",
max_rows=None)
max_rows=None,
before=None)
self.assertEqual(num[0], 1)
# Then archiving console_pools should work.
num = sqlalchemy_api._archive_deleted_rows_for_table("console_pools",
max_rows=None)
max_rows=None,
before=None)
self.assertEqual(num[0], 1)
self._assert_shadow_tables_empty_except(
'shadow_console_pools',
@@ -8731,23 +8829,28 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# migrations.instance_uuid depends on instances.uuid
self._check_sqlite_version_less_than_3_7()
instance_uuid = uuidsentinel.instance
ins_stmt = self.instances.insert().values(uuid=instance_uuid,
deleted=1)
ins_stmt = self.instances.insert().values(
uuid=instance_uuid,
deleted=1,
deleted_at=timeutils.utcnow())
self.conn.execute(ins_stmt)
ins_stmt = self.migrations.insert().values(instance_uuid=instance_uuid,
deleted=0)
self.conn.execute(ins_stmt)
# The first try to archive instances should fail, due to FK.
num = sqlalchemy_api._archive_deleted_rows_for_table("instances",
max_rows=None)
max_rows=None,
before=None)
self.assertEqual(0, num[0])
# Then archiving migrations should work.
num = sqlalchemy_api._archive_deleted_rows_for_table("migrations",
max_rows=None)
max_rows=None,
before=None)
self.assertEqual(1, num[0])
# Then archiving instances should work.
num = sqlalchemy_api._archive_deleted_rows_for_table("instances",
max_rows=None)
max_rows=None,
before=None)
self.assertEqual(1, num[0])
self._assert_shadow_tables_empty_except(
'shadow_instances',
@@ -8764,11 +8867,11 @@ class ArchiveTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Set 4 of each to deleted
update_statement = self.instance_id_mappings.update().\
where(self.instance_id_mappings.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1)
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement)
update_statement2 = self.instances.update().\
where(self.instances.c.uuid.in_(self.uuidstrs[:4]))\
.values(deleted=1)
.values(deleted=1, deleted_at=timeutils.utcnow())
self.conn.execute(update_statement2)
# Verify we have 6 in each main table
qiim = sql.select([self.instance_id_mappings]).where(

View File

@@ -398,7 +398,7 @@ class DBCommandsTestCase(test.NoDBTestCase):
def _test_archive_deleted_rows(self, mock_get_all, mock_db_archive,
verbose=False):
result = self.commands.archive_deleted_rows(20, verbose=verbose)
mock_db_archive.assert_called_once_with(20)
mock_db_archive.assert_called_once_with(20, before=None)
output = self.output.getvalue()
if verbose:
expected = '''\
@@ -449,9 +449,9 @@ Archiving.....complete
expected = ''
self.assertEqual(expected, self.output.getvalue())
mock_db_archive.assert_has_calls([mock.call(20),
mock.call(20),
mock.call(20)])
mock_db_archive.assert_has_calls([mock.call(20, before=None),
mock.call(20, before=None),
mock.call(20, before=None)])
def test_archive_deleted_rows_until_complete_quiet(self):
self.test_archive_deleted_rows_until_complete(verbose=False)
@@ -487,22 +487,34 @@ Rows were archived, running purge...
expected = ''
self.assertEqual(expected, self.output.getvalue())
mock_db_archive.assert_has_calls([mock.call(20),
mock.call(20),
mock.call(20)])
mock_db_archive.assert_has_calls([mock.call(20, before=None),
mock.call(20, before=None),
mock.call(20, before=None)])
mock_db_purge.assert_called_once_with(mock.ANY, None,
status_fn=mock.ANY)
def test_archive_deleted_rows_until_stopped_quiet(self):
self.test_archive_deleted_rows_until_stopped(verbose=False)
@mock.patch.object(db, 'archive_deleted_rows')
@mock.patch.object(objects.CellMappingList, 'get_all')
def test_archive_deleted_rows_before(self, mock_get_all, mock_db_archive):
mock_db_archive.side_effect = [
({'instances': 10, 'instance_extra': 5}, list()),
({'instances': 5, 'instance_faults': 1}, list()),
KeyboardInterrupt]
result = self.commands.archive_deleted_rows(20, before='2017-01-13')
mock_db_archive.assert_called_once_with(20,
before=datetime.datetime(2017, 1, 13))
self.assertEqual(1, result)
@mock.patch.object(db, 'archive_deleted_rows', return_value=({}, []))
@mock.patch.object(objects.CellMappingList, 'get_all')
def test_archive_deleted_rows_verbose_no_results(self, mock_get_all,
mock_db_archive):
result = self.commands.archive_deleted_rows(20, verbose=True,
purge=True)
mock_db_archive.assert_called_once_with(20)
mock_db_archive.assert_called_once_with(20, before=None)
output = self.output.getvalue()
# If nothing was archived, there should be no purge messages
self.assertIn('Nothing was archived.', output)
@@ -540,7 +552,7 @@ Rows were archived, running purge...
result = self.commands.archive_deleted_rows(20, verbose=verbose)
self.assertEqual(1, result)
mock_db_archive.assert_called_once_with(20)
mock_db_archive.assert_called_once_with(20, before=None)
self.assertEqual(1, mock_reqspec_destroy.call_count)
mock_members_destroy.assert_called_once()

View File

@@ -0,0 +1,5 @@
---
features:
- An option ``--before`` has been added to
`nova-manage db archive_deleted_rows` command. This options limits
archiving of records to those deleted before the specified date.