summaryrefslogtreecommitdiff
path: root/tests/feed.py
diff options
context:
space:
mode:
authorA Farzat <a@farzat.xyz>2025-08-22 16:08:58 +0300
committerA Farzat <a@farzat.xyz>2025-08-22 16:08:58 +0300
commite91e1dc571141ff0fc05e477ea4b66ccdf4a3ffd (patch)
tree11988f94366a2312fc1ac340e7bcf95f48c7ddd9 /tests/feed.py
parenta059925d4947306d6d9ad43687eb16c47f8ffece (diff)
downloadcsca5028-e91e1dc571141ff0fc05e477ea4b66ccdf4a3ffd.tar.gz
csca5028-e91e1dc571141ff0fc05e477ea4b66ccdf4a3ffd.zip
Add unit tests
Diffstat (limited to 'tests/feed.py')
-rw-r--r--tests/feed.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/tests/feed.py b/tests/feed.py
new file mode 100644
index 0000000..aaeb96a
--- /dev/null
+++ b/tests/feed.py
@@ -0,0 +1,68 @@
+from typing import Any
+from mongomock import MongoClient
+from pymongo.collection import Collection
+from schedule import Scheduler
+from unittest import TestCase
+from components.subscriptions.main import Subscription
+from components.subscriptions.typing import SubsDict
+from components.users.typing import UserDict
+
+
+class TestFeeds(TestCase):
+ def setUp(self) -> None:
+ self.client: MongoClient[Any] = MongoClient(tz_aware=True)
+ self.collection: Collection[SubsDict] = self.client.db.collection
+ self.scheduler: Scheduler = Scheduler()
+
+ def test_insert(self) -> None:
+ sub = Subscription(
+ _id="yt:channel:bla",
+ link="http://www.youtube.com/feeds/videos.xml?channel_id=UCbla",
+ time_between_fetches=5,
+ )
+ sub._collection = self.collection
+ sub._scheduler = self.scheduler
+ sub.insert()
+ sub_dict = self.collection.find_one({"_id": "yt:channel:bla"})
+ self.assertIsNotNone(sub_dict)
+ assert sub_dict # To appease mypy.
+ self.assertDictEqual(sub.asdict(), sub_dict)
+
+ def test_feed_fetch(self) -> None:
+ sub = Subscription(
+ _id="yt:channel:hlgI3UHCOnwUGzWzbJ3H5w",
+ link=r"tests/data/feed@ytnnews24@001.xml",
+ time_between_fetches=1,
+ )
+ sub._collection = self.collection
+ sub._scheduler = self.scheduler
+ sub.insert()
+ sub.initialise_job()
+ self.scheduler.run_all()
+ self.assertEqual(15, len(sub.videos))
+ sub_dict = self.collection.find_one({"_id": "yt:channel:hlgI3UHCOnwUGzWzbJ3H5w"})
+ self.assertIsNotNone(sub_dict)
+ assert sub_dict # To appease mypy.
+ self.assertEqual(15, len(sub_dict["videos"]))
+
+ def test_feed_update(self) -> None:
+ sub = Subscription(
+ _id="yt:channel:hlgI3UHCOnwUGzWzbJ3H5w",
+ link=r"tests/data/feed@ytnnews24@001.xml",
+ time_between_fetches=1,
+ )
+ sub._collection = self.collection
+ sub._scheduler = self.scheduler
+ sub.insert()
+ sub.initialise_job()
+ self.scheduler.run_all()
+ sub.link=r"tests/data/feed@ytnnews24@002.xml"
+ self.scheduler.run_all()
+ self.assertEqual(16, len(sub.videos))
+ sub_dict = self.collection.find_one({"_id": "yt:channel:hlgI3UHCOnwUGzWzbJ3H5w"})
+ self.assertIsNotNone(sub_dict)
+ assert sub_dict # To appease mypy.
+ self.assertEqual(16, len(sub_dict["videos"]))
+
+ def tearDown(self) -> None:
+ self.client.close()