Use paging to list releases from tiller
Tiller has a non-configurable gRPC max response message size. If the list releases response reaches this size it silently truncates the results to be below this size. Thus for armada to be able to reliably get back all the releases it requests, this patchset implements paging with what should be a small enough page size to avoid the truncation. Change-Id: Ic2de85f6eabcea8655b18b411b79a863160b0c81
This commit is contained in:
		| @@ -198,3 +198,19 @@ class TillerVersionException(TillerException): | ||||
|     ''' | ||||
|  | ||||
|     message = 'Failed to get Tiller Version' | ||||
|  | ||||
|  | ||||
| class TillerListReleasesPagingException(TillerException): | ||||
|     ''' | ||||
|     Exception that occurs when paging through releases listed by tiller | ||||
|     and the total releases changes between pages. | ||||
|  | ||||
|     This could occur as tiller does not use anything like continue tokens for | ||||
|     paging as seen in the kubernetes api for example. | ||||
|  | ||||
|     **Troubleshoot:** | ||||
|     *Coming Soon* | ||||
|     ''' | ||||
|  | ||||
|     message = ('Failed to page through tiller releases, possibly due to ' | ||||
|                'releases being added between pages') | ||||
|   | ||||
| @@ -37,13 +37,12 @@ from armada.utils.release import label_selectors | ||||
|  | ||||
| TILLER_VERSION = b'2.10.0' | ||||
| GRPC_EPSILON = 60 | ||||
| RELEASE_LIMIT = 128  # TODO(mark-burnett): There may be a better page size. | ||||
| LIST_RELEASES_PAGE_SIZE = 32 | ||||
| LIST_RELEASES_ATTEMPTS = 3 | ||||
|  | ||||
| # the standard gRPC max message size is 4MB | ||||
| # this expansion comes at a performance penalty | ||||
| # but until proper paging is supported, we need | ||||
| # to support a larger payload as the current | ||||
| # limit is exhausted with just 10 releases | ||||
| # NOTE(seaneagan): This has no effect on the message size limit that tiller | ||||
| # sets for itself which can be seen here: | ||||
| #   https://github.com/helm/helm/blob/2d77db11fa47005150e682fb13c3cf49eab98fbb/pkg/tiller/server.go#L34 | ||||
| MAX_MESSAGE_LENGTH = 429496729 | ||||
|  | ||||
| CONF = cfg.CONF | ||||
| @@ -190,28 +189,60 @@ class Tiller(object): | ||||
|         ''' | ||||
|         # TODO(MarshM possibly combine list_releases() with list_charts() | ||||
|         # since they do the same thing, grouping output differently | ||||
|         releases = [] | ||||
|         stub = ReleaseServiceStub(self.channel) | ||||
|         # TODO(mark-burnett): Since we're limiting page size, we need to | ||||
|         # iterate through all the pages when collecting this list. | ||||
|         # NOTE(MarshM): `Helm List` defaults to returning Deployed and Failed, | ||||
|         # but this might not be a desireable ListReleasesRequest default. | ||||
|         req = ListReleasesRequest( | ||||
|             limit=RELEASE_LIMIT, | ||||
|             status_codes=[const.STATUS_DEPLOYED, const.STATUS_FAILED], | ||||
|             sort_by='LAST_RELEASED', | ||||
|             sort_order='DESC') | ||||
|  | ||||
|         LOG.debug('Tiller ListReleases() with timeout=%s', self.timeout) | ||||
|         release_list = stub.ListReleases( | ||||
|             req, self.timeout, metadata=self.metadata) | ||||
|         # NOTE(seaneagan): Paging through releases to prevent hitting the | ||||
|         # maximum message size limit that tiller sets for it's reponses. | ||||
|         def get_results(): | ||||
|             releases = [] | ||||
|             done = False | ||||
|             next_release_expected = "" | ||||
|             initial_total = None | ||||
|             while not done: | ||||
|                 req = ListReleasesRequest( | ||||
|                     offset=next_release_expected, | ||||
|                     limit=LIST_RELEASES_PAGE_SIZE, | ||||
|                     status_codes=[const.STATUS_DEPLOYED, const.STATUS_FAILED], | ||||
|                     sort_by='LAST_RELEASED', | ||||
|                     sort_order='DESC') | ||||
|  | ||||
|         for y in release_list: | ||||
|             # TODO(MarshM) this log is too noisy, fix later | ||||
|             # LOG.debug('Found release: %s', y.releases | ||||
|             releases.extend(y.releases) | ||||
|                 LOG.debug('Tiller ListReleases() with timeout=%s, request=%s', | ||||
|                           self.timeout, req) | ||||
|                 response = stub.ListReleases( | ||||
|                     req, self.timeout, metadata=self.metadata) | ||||
|  | ||||
|         return releases | ||||
|                 for message in response: | ||||
|                     page = message.releases | ||||
|  | ||||
|                     if initial_total: | ||||
|                         if message.total != initial_total: | ||||
|                             LOG.warning( | ||||
|                                 'Total releases changed between ' | ||||
|                                 'pages from (%s) to (%s)', initial_total, | ||||
|                                 message.count) | ||||
|                             raise ex.TillerListReleasesPagingException() | ||||
|                     else: | ||||
|                         initial_total = message.total | ||||
|  | ||||
|                     # Add page to results. | ||||
|                     releases.extend(page) | ||||
|  | ||||
|                     if message.next: | ||||
|                         next_release_expected = message.next | ||||
|                     else: | ||||
|                         done = True | ||||
|  | ||||
|             return releases | ||||
|  | ||||
|         for index in range(LIST_RELEASES_ATTEMPTS): | ||||
|             attempt = index + 1 | ||||
|             try: | ||||
|                 return get_results() | ||||
|             except ex.TillerListReleasesPagingException: | ||||
|                 LOG.warning('List releases paging failed on attempt %s/%s', | ||||
|                             attempt, LIST_RELEASES_ATTEMPTS) | ||||
|                 if attempt == LIST_RELEASES_ATTEMPTS: | ||||
|                     raise | ||||
|  | ||||
|     def get_chart_templates(self, template_name, name, release_name, namespace, | ||||
|                             chart, disable_hooks, values): | ||||
|   | ||||
| @@ -153,7 +153,11 @@ class TillerTestCase(base.ArmadaTestCase): | ||||
|     @mock.patch.object(tiller.Tiller, '_get_tiller_ip', autospec=True) | ||||
|     @mock.patch('armada.handlers.tiller.K8s', autospec=True) | ||||
|     @mock.patch('armada.handlers.tiller.grpc', autospec=True) | ||||
|     def test_list_releases_empty(self, mock_grpc, _, mock_ip): | ||||
|     @mock.patch('armada.handlers.tiller.ReleaseServiceStub') | ||||
|     def test_list_releases_empty(self, mock_stub, _, __, mock_ip): | ||||
|         message_mock = mock.Mock(count=0, total=5, next='', releases=[]) | ||||
|         mock_stub.return_value.ListReleases.return_value = [message_mock] | ||||
|  | ||||
|         # instantiate Tiller object | ||||
|         tiller_obj = tiller.Tiller() | ||||
|         self.assertEqual([], tiller_obj.list_releases()) | ||||
| @@ -161,7 +165,11 @@ class TillerTestCase(base.ArmadaTestCase): | ||||
|     @mock.patch.object(tiller.Tiller, '_get_tiller_ip', autospec=True) | ||||
|     @mock.patch('armada.handlers.tiller.K8s', autospec=True) | ||||
|     @mock.patch('armada.handlers.tiller.grpc', autospec=True) | ||||
|     def test_list_charts_empty(self, mock_grpc, _, mock_ip): | ||||
|     @mock.patch('armada.handlers.tiller.ReleaseServiceStub') | ||||
|     def test_list_charts_empty(self, mock_stub, _, __, mock_ip): | ||||
|         message_mock = mock.Mock(count=0, total=5, next='', releases=[]) | ||||
|         mock_stub.return_value.ListReleases.return_value = [message_mock] | ||||
|  | ||||
|         # instantiate Tiller object | ||||
|         tiller_obj = tiller.Tiller() | ||||
|         self.assertEqual([], tiller_obj.list_charts()) | ||||
| @@ -170,30 +178,89 @@ class TillerTestCase(base.ArmadaTestCase): | ||||
|     @mock.patch('armada.handlers.tiller.grpc') | ||||
|     @mock.patch.object(tiller, 'ListReleasesRequest') | ||||
|     @mock.patch.object(tiller, 'ReleaseServiceStub') | ||||
|     def test_list_releases(self, mock_release_service_stub, | ||||
|                            mock_list_releases_request, mock_grpc, _): | ||||
|         mock_release_service_stub.return_value.ListReleases. \ | ||||
|             return_value = [mock.Mock(releases=['foo', 'bar'])] | ||||
|     def test_list_releases_single_page( | ||||
|             self, mock_stub, mock_list_releases_request, mock_grpc, _): | ||||
|         releases = [mock.Mock(), mock.Mock()] | ||||
|         mock_stub.return_value.ListReleases.return_value = [ | ||||
|             mock.Mock( | ||||
|                 next='', | ||||
|                 count=len(releases), | ||||
|                 total=len(releases), | ||||
|                 releases=releases) | ||||
|         ] | ||||
|  | ||||
|         tiller_obj = tiller.Tiller('host', '8080', None) | ||||
|         self.assertEqual(['foo', 'bar'], tiller_obj.list_releases()) | ||||
|         self.assertEqual(releases, tiller_obj.list_releases()) | ||||
|  | ||||
|         mock_release_service_stub.assert_called_once_with(tiller_obj.channel) | ||||
|         list_releases_stub = mock_release_service_stub.return_value. \ | ||||
|             ListReleases | ||||
|         list_releases_stub.assert_called_once_with( | ||||
|         mock_stub.assert_called_once_with(tiller_obj.channel) | ||||
|         mock_stub.return_value.ListReleases.assert_called_once_with( | ||||
|             mock_list_releases_request.return_value, | ||||
|             tiller_obj.timeout, | ||||
|             metadata=tiller_obj.metadata) | ||||
|  | ||||
|         mock_list_releases_request.assert_called_once_with( | ||||
|             limit=tiller.RELEASE_LIMIT, | ||||
|             offset="", | ||||
|             limit=tiller.LIST_RELEASES_PAGE_SIZE, | ||||
|             status_codes=[ | ||||
|                 tiller.const.STATUS_DEPLOYED, tiller.const.STATUS_FAILED | ||||
|             ], | ||||
|             sort_by='LAST_RELEASED', | ||||
|             sort_order='DESC') | ||||
|  | ||||
|     @mock.patch('armada.handlers.tiller.K8s') | ||||
|     @mock.patch('armada.handlers.tiller.grpc') | ||||
|     @mock.patch.object(tiller, 'ListReleasesRequest') | ||||
|     @mock.patch.object(tiller, 'ReleaseServiceStub') | ||||
|     def test_list_releases_paged(self, mock_stub, mock_list_releases_request, | ||||
|                                  mock_grpc, _): | ||||
|         page_count = 3 | ||||
|         release_count = tiller.LIST_RELEASES_PAGE_SIZE * page_count | ||||
|         releases = [mock.Mock() for i in range(release_count)] | ||||
|         for i, release in enumerate(releases): | ||||
|             release.name = mock.PropertyMock(return_value=str(i)) | ||||
|         pages = [[ | ||||
|             mock.Mock( | ||||
|                 count=release_count, | ||||
|                 total=release_count + 5, | ||||
|                 next='' if i == page_count - 1 else str( | ||||
|                     (tiller.LIST_RELEASES_PAGE_SIZE * (i + 1))), | ||||
|                 releases=releases[tiller.LIST_RELEASES_PAGE_SIZE * | ||||
|                                   i:tiller.LIST_RELEASES_PAGE_SIZE * (i + 1)]) | ||||
|         ] for i in range(page_count)] | ||||
|         mock_stub.return_value.ListReleases.side_effect = pages | ||||
|  | ||||
|         mock_list_releases_side_effect = [ | ||||
|             mock.Mock() for i in range(page_count) | ||||
|         ] | ||||
|         mock_list_releases_request.side_effect = mock_list_releases_side_effect | ||||
|  | ||||
|         tiller_obj = tiller.Tiller('host', '8080', None) | ||||
|         self.assertEqual(releases, tiller_obj.list_releases()) | ||||
|  | ||||
|         mock_stub.assert_called_once_with(tiller_obj.channel) | ||||
|  | ||||
|         list_releases_calls = [ | ||||
|             mock.call( | ||||
|                 mock_list_releases_side_effect[i], | ||||
|                 tiller_obj.timeout, | ||||
|                 metadata=tiller_obj.metadata) for i in range(page_count) | ||||
|         ] | ||||
|         mock_stub.return_value.ListReleases.assert_has_calls( | ||||
|             list_releases_calls) | ||||
|  | ||||
|         list_release_request_calls = [ | ||||
|             mock.call( | ||||
|                 offset='' | ||||
|                 if i == 0 else str(tiller.LIST_RELEASES_PAGE_SIZE * i), | ||||
|                 limit=tiller.LIST_RELEASES_PAGE_SIZE, | ||||
|                 status_codes=[ | ||||
|                     tiller.const.STATUS_DEPLOYED, tiller.const.STATUS_FAILED | ||||
|                 ], | ||||
|                 sort_by='LAST_RELEASED', | ||||
|                 sort_order='DESC') for i in range(page_count) | ||||
|         ] | ||||
|         mock_list_releases_request.assert_has_calls(list_release_request_calls) | ||||
|  | ||||
|     @mock.patch('armada.handlers.tiller.K8s') | ||||
|     @mock.patch('armada.handlers.tiller.grpc') | ||||
|     @mock.patch.object(tiller, 'GetReleaseContentRequest') | ||||
|   | ||||
| @@ -41,3 +41,8 @@ Tiller Exceptions | ||||
|    :members: | ||||
|    :show-inheritance: | ||||
|    :undoc-members: | ||||
|  | ||||
| .. autoexception:: armada.exceptions.tiller_exceptions.TillerListReleasesPagingException | ||||
|    :members: | ||||
|    :show-inheritance: | ||||
|    :undoc-members: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Sean Eagan
					Sean Eagan