# -*- coding: utf-8 -*- # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from typing import ( Callable, Iterator, Sequence, Tuple, ) from google.longrunning import operations_pb2 from google.api_core.operations_v1.pagers_base import ListOperationsPagerBase class ListOperationsPager(ListOperationsPagerBase): """A pager for iterating through ``list_operations`` requests. This class thinly wraps an initial :class:`google.longrunning.operations_pb2.ListOperationsResponse` object, and provides an ``__iter__`` method to iterate through its ``operations`` field. If there are more pages, the ``__iter__`` method will make additional ``ListOperations`` requests and continue to iterate through the ``operations`` field on the corresponding responses. All the usual :class:`google.longrunning.operations_pb2.ListOperationsResponse` attributes are available on the pager. If multiple requests are made, only the most recent response is retained, and thus used for attribute lookup. """ def __init__( self, method: Callable[..., operations_pb2.ListOperationsResponse], request: operations_pb2.ListOperationsRequest, response: operations_pb2.ListOperationsResponse, *, metadata: Sequence[Tuple[str, str]] = () ): super().__init__( method=method, request=request, response=response, metadata=metadata ) @property def pages(self) -> Iterator[operations_pb2.ListOperationsResponse]: yield self._response while self._response.next_page_token: self._request.page_token = self._response.next_page_token self._response = self._method(self._request, metadata=self._metadata) yield self._response def __iter__(self) -> Iterator[operations_pb2.Operation]: for page in self.pages: yield from page.operations