Rafał Pitoń 10 лет назад
Родитель
Сommit
65ab7b3713
2 измененных файлов с 159 добавлено и 0 удалено
  1. 71 0
      misago/core/apipaginator.py
  2. 88 0
      misago/core/tests/test_apipaginator.py

+ 71 - 0
misago/core/apipaginator.py

@@ -0,0 +1,71 @@
+from django.core.paginator import InvalidPage, Paginator as DjangoPaginator
+from rest_framework.compat import OrderedDict
+from rest_framework.exceptions import NotFound
+from rest_framework.pagination import PageNumberPagination
+from rest_framework.response import Response
+
+
+class BaseApiPaginator(PageNumberPagination):
+    def paginate_queryset(self, queryset, request, view=None):
+        """
+        Paginate a queryset if required, either returning a
+        page object, or `None` if pagination is not configured for this view.
+        """
+        paginator = DjangoPaginator(
+            queryset, self.page_size, orphans=self.page_orphans)
+        page_number = request.query_params.get(self.page_query_param, 1)
+
+        try:
+            self.page = paginator.page(page_number)
+        except InvalidPage as exc:
+            msg = self.invalid_page_message.format(
+                page_number=page_number, message=six.text_type(exc)
+            )
+            raise NotFound(msg)
+
+        if paginator.count > 1 and self.template is not None:
+            # The browsable API should display pagination controls.
+            self.display_page_controls = True
+
+        self.request = request
+        return list(self.page)
+
+    def get_pagination_meta(self):
+        pagination = {
+            'pages': self.page.paginator.num_pages,
+            'first': None,
+            'previous': None,
+            'next': None,
+            'last': None,
+        }
+
+        if self.page.has_previous():
+            pagination['first'] = 1
+            if self.page.previous_page_number() > 1:
+                pagination['previous'] = self.page.previous_page_number()
+
+        if self.page.has_next():
+            pagination['last'] = self.page.paginator.num_pages
+            if self.page.next_page_number() < self.page.paginator.num_pages:
+                pagination['next'] = self.page.next_page_number()
+
+        return OrderedDict([
+            ('count', self.page.paginator.count),
+            ('pages', pagination['pages']),
+            ('first', pagination['first']),
+            ('previous', pagination['previous']),
+            ('next', pagination['next']),
+            ('last', pagination['last'])
+        ])
+
+    def get_paginated_response(self, data):
+        response_data = self.get_pagination_meta()
+        response_data['results'] = data
+        return Response(response_data)
+
+
+def ApiPaginator(per_page=None, orphans=0):
+    return type('ApiPaginator', (BaseApiPaginator, ), {
+        'page_size': per_page,
+        'page_orphans': orphans
+    })

+ 88 - 0
misago/core/tests/test_apipaginator.py

@@ -0,0 +1,88 @@
+from django.test import TestCase
+from misago.core.apipaginator import ApiPaginator
+
+
+class MockRequest(object):
+    def __init__(self, page=None):
+        self.query_params = {}
+        if page:
+            self.query_params['page'] = page
+
+
+class PaginatorTests(TestCase):
+    def test_init_paginator(self):
+        """ApiPaginator returns type that can be initalized"""
+        paginator = ApiPaginator(3, 1)()
+
+    def test_empty_queryset(self):
+        """pagination works for empty queryset"""
+        paginator = ApiPaginator(6, 2)()
+        querset = []
+
+        results = paginator.paginate_queryset(querset, MockRequest())
+        self.assertEqual(results, [])
+
+        meta = paginator.get_pagination_meta()
+        self.assertEqual(meta['count'], 0)
+        self.assertEqual(meta['pages'], 1)
+        self.assertEqual(meta['first'], None)
+        self.assertEqual(meta['previous'], None)
+        self.assertEqual(meta['next'], None)
+        self.assertEqual(meta['last'], None)
+
+    def test_first_page(self):
+        """pagination works for first page of queryset"""
+        paginator = ApiPaginator(6, 2)()
+        querset = [i for i in xrange(20)]
+
+        results = paginator.paginate_queryset(querset, MockRequest())
+        self.assertEqual(results, [0, 1, 2, 3, 4, 5])
+
+        meta = paginator.get_pagination_meta()
+        self.assertEqual(meta['count'], 20)
+        self.assertEqual(meta['pages'], 3)
+        self.assertEqual(meta['first'], None)
+        self.assertEqual(meta['previous'], None)
+        self.assertEqual(meta['next'], 2)
+        self.assertEqual(meta['last'], 3)
+
+        response = paginator.get_paginated_response(results)
+        self.assertEqual(response.status_code, 200)
+
+    def test_next_page(self):
+        """pagination works for next page of queryset"""
+        paginator = ApiPaginator(6, 2)()
+        querset = [i for i in xrange(20)]
+
+        results = paginator.paginate_queryset(querset, MockRequest(2))
+        self.assertEqual(results, [6, 7, 8, 9, 10, 11])
+
+        meta = paginator.get_pagination_meta()
+        self.assertEqual(meta['count'], 20)
+        self.assertEqual(meta['pages'], 3)
+        self.assertEqual(meta['first'], 1)
+        self.assertEqual(meta['previous'], None)
+        self.assertEqual(meta['next'], None)
+        self.assertEqual(meta['last'], 3)
+
+        response = paginator.get_paginated_response(results)
+        self.assertEqual(response.status_code, 200)
+
+    def test_last_page(self):
+        """pagination works for next page of queryset"""
+        paginator = ApiPaginator(6, 2)()
+        querset = [i for i in xrange(20)]
+
+        results = paginator.paginate_queryset(querset, MockRequest(3))
+        self.assertEqual(results, [12, 13, 14, 15, 16, 17, 18, 19])
+
+        meta = paginator.get_pagination_meta()
+        self.assertEqual(meta['count'], 20)
+        self.assertEqual(meta['pages'], 3)
+        self.assertEqual(meta['first'], 1)
+        self.assertEqual(meta['previous'], 2)
+        self.assertEqual(meta['next'], None)
+        self.assertEqual(meta['last'], None)
+
+        response = paginator.get_paginated_response(results)
+        self.assertEqual(response.status_code, 200)