Rafał Pitoń 7 лет назад
Родитель
Сommit
bca3f352c7
4 измененных файлов с 160 добавлено и 80 удалено
  1. 18 14
      misago/threads/signals.py
  2. 48 16
      misago/users/dataarchive.py
  3. 14 14
      misago/users/signals.py
  4. 80 36
      misago/users/tests/test_dataarchive.py

+ 18 - 14
misago/threads/signals.py

@@ -129,38 +129,42 @@ def delete_user_threads(sender, **kwargs):
 def archive_user_attachments(sender, archive=None, **kwargs):
 def archive_user_attachments(sender, archive=None, **kwargs):
     queryset = sender.attachment_set.order_by('id')
     queryset = sender.attachment_set.order_by('id')
     for attachment in chunk_queryset(queryset):
     for attachment in chunk_queryset(queryset):
-        item_path = [attachment.uploaded_on.year, attachment.uploaded_on.month, attachment.uploaded_on.day]
-        archive.add_model_file(attachment.image or attachment.file, item_path)
+        archive.add_model_file(
+            attachment.image or attachment.file,
+            prefix=attachment.uploaded_on.strftime('%H%M%S'),
+            date=attachment.uploaded_on,
+        )
 
 
 
 
 @receiver(archive_user_data)
 @receiver(archive_user_data)
 def archive_user_posts(sender, archive=None, **kwargs):
 def archive_user_posts(sender, archive=None, **kwargs):
     queryset = sender.post_set.order_by('id')
     queryset = sender.post_set.order_by('id')
     for post in chunk_queryset(queryset):
     for post in chunk_queryset(queryset):
-        item_name = post.posted_on.strftime('post_%H%M%S')
-        item_path = [post.posted_on.year, post.posted_on.month, post.posted_on.day]
-        archive.add_text(item_name, post.parsed, item_path)
+        item_name = post.posted_on.strftime('%H%M%S-post')
+        archive.add_text(item_name, post.parsed, date=post.posted_on)
 
 
 
 
 @receiver(archive_user_data)
 @receiver(archive_user_data)
 def archive_user_posts_edits(sender, archive=None, **kwargs):
 def archive_user_posts_edits(sender, archive=None, **kwargs):
     queryset = sender.postedit_set.order_by('id')
     queryset = sender.postedit_set.order_by('id')
     for post_edit in chunk_queryset(queryset):
     for post_edit in chunk_queryset(queryset):
-        item_name = post_edit.edited_on.strftime('post_edit_%H%M%S')
-        item_path = [post_edit.edited_on.year, post_edit.edited_on.month, post_edit.edited_on.day]
-        archive.add_text(item_name, post_edit.edited_from, item_path)
+        item_name = post_edit.edited_on.strftime('%H%M%S-post-edit')
+        archive.add_text(item_name, post_edit.edited_from, date=post_edit.edited_on)
 
 
 
 
 @receiver(archive_user_data)
 @receiver(archive_user_data)
 def archive_user_polls(sender, archive=None, **kwargs):
 def archive_user_polls(sender, archive=None, **kwargs):
     queryset = sender.poll_set.order_by('id')
     queryset = sender.poll_set.order_by('id')
     for poll in chunk_queryset(queryset):
     for poll in chunk_queryset(queryset):
-        item_name = poll.posted_on.strftime('poll_%H%M%S')
-        item_path = [poll.posted_on.year, poll.posted_on.month, poll.posted_on.day]
-        archive.add_dict(item_name, OrderedDict([
-            (_("Question"), poll.question),
-            (_("Choices"), u', '.join([c['label'] for c in poll.choices])),
-        ]), item_path)
+        item_name = poll.posted_on.strftime('%H%M%S-poll')
+        archive.add_dict(
+            item_name,
+            OrderedDict([
+                (_("Question"), poll.question),
+                (_("Choices"), u', '.join([c['label'] for c in poll.choices])),
+            ]),
+            date=poll.posted_on,
+        )
 
 
 
 
 @receiver(anonymize_user_content)
 @receiver(anonymize_user_content)

+ 48 - 16
misago/users/dataarchive.py

@@ -9,6 +9,9 @@ from django.utils import six
 from misago.core.utils import slugify
 from misago.core.utils import slugify
 
 
 
 
+FILENAME_MAX_LEN = 50
+
+
 class DataArchive(object):
 class DataArchive(object):
     def __init__(self, user, working_dir_path):
     def __init__(self, user, working_dir_path):
         self.user = user
         self.user = user
@@ -70,28 +73,35 @@ class DataArchive(object):
             os.remove(self.file_path)
             os.remove(self.file_path)
             self.file_path = None
             self.file_path = None
 
 
-    def add_text(self, name, value, path=None):
+    def add_text(self, name, value, date=None, directory=None):
         clean_filename = slugify(str(name))
         clean_filename = slugify(str(name))
-        file_dir_path = self.make_path(path)
+        file_dir_path = self.make_final_path(date=date, directory=directory)
         file_path = os.path.join(file_dir_path, '{}.txt'.format(clean_filename))
         file_path = os.path.join(file_dir_path, '{}.txt'.format(clean_filename))
         with open(file_path, 'w+') as fp:
         with open(file_path, 'w+') as fp:
             fp.write(six.text_type(value))
             fp.write(six.text_type(value))
             return file_path
             return file_path
 
 
-    def add_dict(self, name, value, path=None):
+    def add_dict(self, name, value, date=None, directory=None):
         text_lines = []
         text_lines = []
         for key, value in value.items():
         for key, value in value.items():
             text_lines.append(u"{}: {}".format(key, value))
             text_lines.append(u"{}: {}".format(key, value))
         text = u'\n'.join(text_lines)
         text = u'\n'.join(text_lines)
-        return self.add_text(name, text, path)
+        return self.add_text(name, text, date=date, directory=directory)
 
 
-    def add_model_file(self, model_file, path=None):
+    def add_model_file(self, model_file, prefix=None, date=None, directory=None):
         if not model_file:
         if not model_file:
             return None
             return None
 
 
-        clean_filename = model_file.name.split('/')[-1]
-        target_dir_path = self.make_path(path)
-        target_path = os.path.join(target_dir_path, clean_filename)
+        target_dir_path = self.make_final_path(date=date, directory=directory)
+
+        filename = os.path.basename(model_file.name)
+        if prefix:
+            prefixed_filename = u"{}-{}".format(prefix, filename)
+            clean_filename = trim_long_filename(prefixed_filename)
+            target_path = os.path.join(target_dir_path, clean_filename)
+        else:
+            clean_filename = trim_long_filename(filename)
+            target_path = os.path.join(target_dir_path, clean_filename)
 
 
         with open(target_path, 'wb') as fp:
         with open(target_path, 'wb') as fp:
             for chunk in model_file.chunks():
             for chunk in model_file.chunks():
@@ -99,18 +109,29 @@ class DataArchive(object):
 
 
         return target_path
         return target_path
 
 
-    def make_path(self, path):
-        # fixme: this can be simplified in py37k
-        final_path = self.data_dir_path
+    def make_final_path(self, date=None, directory=None):
+        # fixme: os.path.isdir test can be avoided in py37k
+        if date and directory:
+            raise ValueError("date and directory arguments are mutually exclusive")
 
 
-        if not path:
+        data_dir_path = self.data_dir_path
+
+        if date:
+            final_path = data_dir_path
+            path_items = [date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')]
+            for path_item in path_items:
+                final_path = os.path.join(final_path, six.text_type(path_item))
+                if not os.path.isdir(final_path):
+                    os.mkdir(final_path)
             return final_path
             return final_path
 
 
-        for path_item in path:
-            final_path = os.path.join(final_path, six.text_type(path_item))
+        if directory:
+            final_path = os.path.join(data_dir_path, six.text_type(directory))
             if not os.path.isdir(final_path):
             if not os.path.isdir(final_path):
                 os.mkdir(final_path)
                 os.mkdir(final_path)
-        return final_path
+            return final_path
+
+        return data_dir_path
 
 
 
 
 def get_tmp_filename(user):
 def get_tmp_filename(user):
@@ -120,4 +141,15 @@ def get_tmp_filename(user):
         get_random_string(6),
         get_random_string(6),
     ]
     ]
 
 
-    return '-'.join(filename_bits)
+    return '-'.join(filename_bits)
+
+
+def trim_long_filename(filename):
+    # fixme: consider moving this utility to better place?
+    # eg. to trim too long attachment filenames on upload
+    if len(filename) < FILENAME_MAX_LEN:
+        return filename
+
+    name, extension = os.path.splitext(filename)
+    name_len = FILENAME_MAX_LEN - len(extension)
+    return u'{}{}'.format(name[:name_len], extension)

+ 14 - 14
misago/users/signals.py

@@ -48,32 +48,32 @@ def archive_user_profile_fields(sender, archive=None, **kwargs):
 
 
 @receiver(archive_user_data)
 @receiver(archive_user_data)
 def archive_user_avatar(sender, archive=None, **kwargs):
 def archive_user_avatar(sender, archive=None, **kwargs):
-    avatars_path = ['avatar']
-
-    archive.add_model_file(sender.avatar_tmp, avatars_path)
-    archive.add_model_file(sender.avatar_src, avatars_path)
+    archive.add_model_file(sender.avatar_tmp, directory='avatar')
+    archive.add_model_file(sender.avatar_src, directory='avatar')
     for avatar in sender.avatar_set.iterator():
     for avatar in sender.avatar_set.iterator():
-        archive.add_model_file(avatar.image, avatars_path)
+        archive.add_model_file(avatar.image, directory='avatar')
 
 
 
 
 @receiver(archive_user_data)
 @receiver(archive_user_data)
 def archive_user_audit_trail(sender, archive=None, **kwargs):
 def archive_user_audit_trail(sender, archive=None, **kwargs):
     queryset = sender.audittrail_set.order_by('id')
     queryset = sender.audittrail_set.order_by('id')
     for audit_trail in chunk_queryset(queryset):
     for audit_trail in chunk_queryset(queryset):
-        item_name = audit_trail.created_at.strftime('audit_trail_%H%M%S')
-        item_path = [audit_trail.created_at.year, audit_trail.created_at.month, audit_trail.created_at.day]
-        archive.add_text(item_name, audit_trail.ip_address, item_path)
+        item_name = audit_trail.created_at.strftime('%H%M%S-audit-trail')
+        archive.add_text(item_name, audit_trail.ip_address, date=audit_trail.created_at)
 
 
 
 
 @receiver(archive_user_data)
 @receiver(archive_user_data)
 def archive_user_name_history(sender, archive=None, **kwargs):
 def archive_user_name_history(sender, archive=None, **kwargs):
     for name_change in sender.namechanges.order_by('id').iterator():
     for name_change in sender.namechanges.order_by('id').iterator():
-        item_name = name_change.changed_on.strftime('name_change_%H%M%S')
-        item_path = [name_change.changed_on.year, name_change.changed_on.month, name_change.changed_on.day]
-        archive.add_dict(item_name, OrderedDict([
-            (_("New username"), name_change.new_username),
-            (_("Old username"), name_change.old_username),
-        ]), item_path)
+        item_name = name_change.changed_on.strftime('%H%M%S-name-change')
+        archive.add_dict(
+            item_name,
+            OrderedDict([
+                (_("New username"), name_change.new_username),
+                (_("Old username"), name_change.old_username),
+            ]),
+            date=name_change.changed_on,
+        )
 
 
 
 
 @receiver(username_changed)
 @receiver(username_changed)

+ 80 - 36
misago/users/tests/test_dataarchive.py

@@ -3,9 +3,11 @@ import os
 from collections import OrderedDict
 from collections import OrderedDict
 
 
 from django.core.files import File
 from django.core.files import File
+from django.test import TestCase
+from django.utils import timezone
 
 
 from misago.conf import settings
 from misago.conf import settings
-from misago.users.dataarchive import DataArchive
+from misago.users.dataarchive import FILENAME_MAX_LEN, DataArchive, trim_long_filename
 from misago.users.testutils import AuthenticatedUserTestCase
 from misago.users.testutils import AuthenticatedUserTestCase
 
 
 
 
@@ -79,21 +81,6 @@ class DataArchiveTests(AuthenticatedUserTestCase):
                 saved_data = fp.read().strip()
                 saved_data = fp.read().strip()
                 self.assertEqual(saved_data, str(data_to_write))
                 self.assertEqual(saved_data, str(data_to_write))
 
 
-    def test_add_text_path(self):
-        """add_dict method creates text file under path"""
-        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
-            data_to_write = u"Hello, łorld!"
-            file_path = archive.add_text(
-                'testfile', data_to_write, path=['avatars', 'tmp'])
-            self.assertTrue(os.path.isfile(file_path))
-
-            valid_output_path = os.path.join(archive.data_dir_path, 'testfile.txt')
-            self.assertEqual(file_path, valid_output_path)
-
-            data_dir_path = str(archive.data_dir_path)
-            self.assertTrue(str(valid_output_path).startswith(data_dir_path))
-            self.assertIn('/avatars/tmp/', str(valid_output_path))
-
     def test_add_dict(self):
     def test_add_dict(self):
         """add_dict method creates text file from dict"""
         """add_dict method creates text file from dict"""
         with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
         with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
@@ -122,21 +109,6 @@ class DataArchiveTests(AuthenticatedUserTestCase):
                 saved_data = fp.read().strip()
                 saved_data = fp.read().strip()
                 self.assertEqual(saved_data, u"first: łorld!\nsecond: łup!")
                 self.assertEqual(saved_data, u"first: łorld!\nsecond: łup!")
 
 
-    def test_add_dict_path(self):
-        """add_dict method creates text file under path"""
-        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
-            data_to_write = {'first': u"łorld!", 'second': u"łup!"}
-            file_path = archive.add_dict(
-                'testfile', data_to_write, path=['avatars', 'tmp']))
-            self.assertTrue(os.path.isfile(file_path))
-
-            valid_output_path = os.path.join(archive.data_dir_path, 'testfile.txt')
-            self.assertEqual(file_path, valid_output_path)
-
-            data_dir_path = str(archive.data_dir_path)
-            self.assertTrue(str(valid_output_path).startswith(data_dir_path))
-            self.assertIn('/avatars/tmp/', str(valid_output_path))
-
     def test_add_model_file(self):
     def test_add_model_file(self):
         """add_model_file method adds model file"""
         """add_model_file method adds model file"""
         with open(TEST_AVATAR_PATH, 'rb') as avatar:
         with open(TEST_AVATAR_PATH, 'rb') as avatar:
@@ -159,21 +131,73 @@ class DataArchiveTests(AuthenticatedUserTestCase):
             self.assertIsNone(file_path)
             self.assertIsNone(file_path)
             self.assertFalse(os.listdir(archive.data_dir_path))
             self.assertFalse(os.listdir(archive.data_dir_path))
 
 
-    def test_add_model_file_path(self):
-        """add_model_file method adds model file under path"""
+    def test_add_model_file_prefixed(self):
+        """add_model_file method adds model file with prefix"""
         with open(TEST_AVATAR_PATH, 'rb') as avatar:
         with open(TEST_AVATAR_PATH, 'rb') as avatar:
             self.user.avatar_tmp = File(avatar)
             self.user.avatar_tmp = File(avatar)
             self.user.save()
             self.user.save()
 
 
         with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
         with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
-            file_path = archive.add_model_file(
-                self.user.avatar_tmp, path=['avatars', 'tmp'])
+            file_path = archive.add_model_file(self.user.avatar_tmp, prefix="prefix")
 
 
             self.assertTrue(os.path.isfile(file_path))
             self.assertTrue(os.path.isfile(file_path))
     
     
             data_dir_path = str(archive.data_dir_path)
             data_dir_path = str(archive.data_dir_path)
             self.assertTrue(str(file_path).startswith(data_dir_path))
             self.assertTrue(str(file_path).startswith(data_dir_path))
-            self.assertIn('/avatars/tmp/', str(file_path))
+            
+            filename = os.path.basename(self.user.avatar_tmp.name)
+            target_filename = 'prefix-{}'.format(filename)
+            self.assertTrue(str(file_path).endswith(target_filename))
+
+    def test_make_final_path_no_kwargs(self):
+        """make_final_path returns data_dir_path if no kwargs are set"""
+        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
+            final_path = archive.make_final_path()
+            self.assertEqual(final_path, archive.data_dir_path)
+
+    def test_make_final_path_directory(self):
+        """make_final_path returns path including directory name"""
+        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
+            final_path = archive.make_final_path(directory='test-directory')
+            valid_path = os.path.join(archive.data_dir_path, 'test-directory')
+            self.assertEqual(final_path, valid_path)
+
+    def test_make_final_path_date(self):
+        """make_final_path returns path including date segments"""
+        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
+            now = timezone.now().date()
+            final_path = archive.make_final_path(date=now)
+            
+            valid_path = os.path.join(
+                archive.data_dir_path,
+                now.strftime('%Y'),
+                now.strftime('%m'),
+                now.strftime('%d')
+            )
+
+            self.assertEqual(final_path, valid_path)
+
+    def test_make_final_path_datetime(self):
+        """make_final_path returns path including date segments"""
+        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
+            now = timezone.now()
+            final_path = archive.make_final_path(date=now)
+            
+            valid_path = os.path.join(
+                archive.data_dir_path,
+                now.strftime('%Y'),
+                now.strftime('%m'),
+                now.strftime('%d')
+            )
+
+            self.assertEqual(final_path, valid_path)
+
+    def test_make_final_path_both_kwargs(self):
+        """make_final_path raises value error if both date and directory are set"""
+        with DataArchive(self.user, DATA_DOWNLOADS_WORKING_DIR) as archive:
+            expected_message = "date and directory arguments are mutually exclusive"
+            with self.assertRaisesMessage(ValueError, expected_message):
+                archive.make_final_path(date=timezone.now(), directory='test')
 
 
     def test_get_file(self):
     def test_get_file(self):
         """get_file returns django file"""
         """get_file returns django file"""
@@ -196,3 +220,23 @@ class DataArchiveTests(AuthenticatedUserTestCase):
         self.assertIsNone(archive.file)
         self.assertIsNone(archive.file)
         self.assertIsNone(archive.file_path)
         self.assertIsNone(archive.file_path)
         self.assertTrue(django_file.closed)
         self.assertTrue(django_file.closed)
+
+
+class TrimLongFilenameTests(TestCase):
+    def test_trim_short_filename(self):
+        """trim_too_long_filename returns short filename as it is"""
+        filename = 'filename.jpg'
+        trimmed_filename = trim_long_filename(filename)
+        self.assertEqual(trimmed_filename, filename)
+
+    def test_trim_too_long_filename(self):
+        """trim_too_long_filename trims filename if its longer than allowed"""
+        filename = 'filename'
+        extension = '.jpg'
+        long_filename = '{}{}'.format(filename * 10, extension)
+
+        trimmed_filename = trim_long_filename(long_filename)
+        
+        self.assertEqual(len(trimmed_filename), FILENAME_MAX_LEN)
+        self.assertTrue(trimmed_filename.startswith(filename))
+        self.assertTrue(trimmed_filename.endswith(extension))