Browse Source

DataCollector utility class

Rafał Pitoń 7 years ago
parent
commit
827b9ba9ed
2 changed files with 101 additions and 5 deletions
  1. 10 4
      misago/users/datacollector.py
  2. 91 1
      misago/users/tests/test_datacollector.py

+ 10 - 4
misago/users/datacollector.py

@@ -84,10 +84,16 @@ class DataCollector(DataWriter):
     def create_archive(self):
         archive_name = self.get_tmp_dir_name()
         archive_path = os.path.join(self.working_dir_path, archive_name)
-        shutil.make_archive(archive_path, 'zip', self.tmp_dir_path)
 
-        self.archive_path = archive_path
-        return archive_path
+        self.archive_path = shutil.make_archive(archive_path, 'zip', self.tmp_dir_path)
+        return self.archive_path
+
+    def delete_archive(self):
+        if self.archive_path:
+            os.remove(self.archive_path)
+            self.archive_path = None
 
     def delete_tmp_dir(self):
-        shutil.rmtree(self.tmp_dir_path)
+        if self.tmp_dir_path:
+            shutil.rmtree(self.tmp_dir_path)
+            self.tmp_dir_path = None

+ 91 - 1
misago/users/tests/test_datacollector.py

@@ -66,4 +66,94 @@ class DataCollectorTests(AuthenticatedUserTestCase):
         file_path = data_collector.write_model_file(self.user.avatar_tmp)
         
         self.assertIsNone(file_path)
-        self.assertFalse(os.listdir(data_collector.data_dir_path))
+        self.assertFalse(os.listdir(data_collector.data_dir_path))
+
+    def test_create_collection(self):
+        """create_collection creates new directory for collection"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        collection = data_collector.create_collection('collection')
+
+        data_dir_path = str(data_collector.data_dir_path)
+        collection_dir_path = str(collection.data_dir_path)
+        self.assertNotEqual(data_dir_path, collection_dir_path)
+        self.assertTrue(collection_dir_path.startswith(data_dir_path))
+
+        self.assertTrue(os.path.exists(collection.data_dir_path))
+
+    def test_collect_write_data_file(self):
+        """write_data_file creates new data file in collection data_dir_path"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        collection = data_collector.create_collection('collection')
+
+        data_to_write = {'hello': "I am test!", 'nice': u"łał!"}
+        data_file_path = collection.write_data_file("testfile", data_to_write)
+        self.assertTrue(os.path.isfile(data_file_path))
+
+        valid_output_path = os.path.join(collection.data_dir_path, 'testfile.txt')
+        self.assertEqual(data_file_path, valid_output_path)
+
+        with open(data_file_path, 'r') as fp:
+            saved_data = fp.read().strip().splitlines()
+            self.assertEqual(saved_data, ["hello: I am test!", u"nice: łał!"])
+
+    def test_collect_write_model_file(self):
+        """write_model_file includes model file in collection data_dir_path"""
+        with open(TEST_AVATAR_PATH, 'rb') as avatar:
+            self.user.avatar_tmp = File(avatar)
+            self.user.save()
+
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        collection = data_collector.create_collection('collection')
+
+        file_path = collection.write_model_file(self.user.avatar_tmp)
+        
+        self.assertTrue(os.path.isfile(file_path))
+    
+        data_dir_path = str(collection.data_dir_path)
+        self.assertTrue(str(file_path).startswith(data_dir_path))
+
+    def test_collect_write_model_file_empty(self):
+        """write_model_file is noop if model file field is none"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        collection = data_collector.create_collection('collection')
+
+        file_path = collection.write_model_file(self.user.avatar_tmp)
+        
+        self.assertIsNone(file_path)
+        self.assertFalse(os.listdir(collection.data_dir_path))
+
+    def test_create_archive(self):
+        """create_archive creates zip file from collected data"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        
+        data_to_write = {'hello': "I am test!", 'nice': u"łał!"}
+        data_collector.write_data_file("testfile", data_to_write)
+
+        with open(TEST_AVATAR_PATH, 'rb') as avatar:
+            self.user.avatar_tmp = File(avatar)
+            self.user.save()
+        data_collector.write_model_file(self.user.avatar_tmp)
+
+        archive_path = data_collector.create_archive()
+        self.assertEqual(data_collector.archive_path, archive_path)
+        self.assertTrue(os.path.isfile(archive_path))
+
+    def test_delete_archive(self):
+        """delete_archive deletes zip file"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        archive_path = data_collector.create_archive()
+        data_collector.delete_archive()
+        self.assertFalse(os.path.isfile(archive_path))
+
+    def test_delete_archive_none(self):
+        """delete_archive is noop if zip file doesnt exist"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        self.assertIsNone(data_collector.archive_path)
+        data_collector.delete_archive()
+
+    def test_delete_tmp_dir(self):
+        """delete_tmp_dir delete's directory but leaves archive"""
+        data_collector = DataCollector(self.user, DATA_DOWNLOADS_WORKING_DIR)
+        tmp_dir_path = data_collector.tmp_dir_path
+        data_collector.delete_tmp_dir()
+        self.assertFalse(os.path.exists(tmp_dir_path))