diff --git a/rbackup/struct/repository.py b/rbackup/struct/repository.py index cacc6ed..24eb741 100644 --- a/rbackup/struct/repository.py +++ b/rbackup/struct/repository.py @@ -46,6 +46,7 @@ class Repository(Hierarchy): Methods ------- + * cleanup - clean all repository data * create_snapshot - create a new snapshot * gen_metadata (inherited from Hierarchy) * is_valid_snapshot_name - validate a potential name for a snapshot @@ -207,3 +208,41 @@ class Repository(Hierarchy): syslog.debug(f"Snapshot name: {new_snapshot.name}") return new_snapshot + + def cleanup(self, *, remove_snapshots=False, remove_repo_dir=False): + """Clean up any filesystem references to this repository. + By default, no snapshots are deleted. + + :param remove_snapshots: delete the data directory of this repository + :type remove_snapshots: bool + :param remove_repo_dir: remove the top-directory level of this repository + :type remove_repo_dir: bool + """ + # We don't want to risk symlink attacks + if not shutil.rmtree.avoids_symlink_attacks: + syslog.error( + "shutil cannot avoid symlink attacks on this platform. Ignoring." + ) + return + + syslog.debug("Cleaning repository data") + + self.metadata_path.unlink() + syslog.info("Removing repository metadata") + syslog.debug(f"Repository metadata removed: {self.metadata_path}") + + if remove_snapshots: + try: + shutil.rmtree(self.snapshot_dir) + except PermissionError as e: + syslog.error(e) + else: + syslog.info("Removed snapshots") + + if remove_repo_dir: + try: + self.path.unlink() + except PermissionError as e: + syslog.error(e) + else: + syslog.info(f"Removed repository directory: {self.path}") diff --git a/tests/test_repository.py b/tests/test_repository.py index fa9674c..d032267 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -180,9 +180,59 @@ class TestRepositoryPostCreate(unittest.TestCase): repo.create_snapshot(name) self.assertTrue(name in repo) + self.assertEqual(len(repo), 1) def tearDown(self): self.patched_path.stop() self.patched_r_metadata.stop() self.patched_w_metadata.stop() self.patched_snapshot.stop() + + +class TestRepositoryCleanup(unittest.TestCase): + """Test that repository cleanup works properly. + + Test cases + ---------- + * Function stops if system is not symlink attack-resistant + * If symlink attack-resistant, then only delete metadata when all others false + * Function only deletes snapshots when told to + * Function only deletes repository directory when told to + """ + + def setUp(self): + self.patched_path = patch.object( + Repository, "metadata_path", new_callable=PropertyMock + ) + self.patched_r_metadata = patch.object( + Repository, "read_metadata", spec_set=list + ) + self.patched_w_metadata = patch.object( + Repository, "write_metadata", spec_set=list + ) + self.patched_shutil = patch(f"{TESTING_PACKAGE}.repository.shutil") + self.patched_snapshot = patch( + f"{TESTING_PACKAGE}.repository.Snapshot", spec_set=Snapshot + ) + + self.mocked_path = self.patched_path.start() + self.mocked_r_metadata = self.patched_r_metadata.start() + self.mocked_w_metadata = self.patched_w_metadata.start() + self.mocked_shutil = self.patched_shutil.start() + self.mocked_snapshot = self.patched_snapshot.start() + + def test_stops_on_non_symlink_resistant(self): + self.mocked_shutil.rmtree.avoids_symlink_attacks = False + repo = Repository("backup") + + repo.cleanup(remove_snapshots=True) + + self.mocked_path.return_value.unlink.assert_not_called() + self.mocked_shutil.rmtree.assert_not_called() + + def tearDown(self): + self.patched_path.stop() + self.patched_r_metadata.stop() + self.patched_w_metadata.stop() + self.patched_shutil.stop() + self.patched_snapshot.stop()