Added a test for submitter (#1944)

This commit is contained in:
Soxoj
2024-12-08 13:35:27 +01:00
committed by GitHub
parent c66d776f8a
commit 4eada16b94
8 changed files with 105 additions and 30 deletions
+3 -1
View File
@@ -381,7 +381,9 @@ def process_site_result(
extracted_ids_data = extract_ids_data(html_text, logger, site) extracted_ids_data = extract_ids_data(html_text, logger, site)
if extracted_ids_data: if extracted_ids_data:
new_usernames = parse_usernames(extracted_ids_data, logger) new_usernames = parse_usernames(extracted_ids_data, logger)
results_info = update_results_info(results_info, extracted_ids_data, new_usernames) results_info = update_results_info(
results_info, extracted_ids_data, new_usernames
)
result.ids_data = extracted_ids_data result.ids_data = extracted_ids_data
# Save status of request # Save status of request
+1 -1
View File
@@ -174,4 +174,4 @@ def notify_about_errors(
('You can see detailed site check errors with a flag `--print-errors`', '-') ('You can see detailed site check errors with a flag `--print-errors`', '-')
) )
return results return results
+3 -1
View File
@@ -665,7 +665,9 @@ async def main():
check_domains=args.with_domains, check_domains=args.with_domains,
) )
errs = errors.notify_about_errors(results, query_notify, show_statistics=args.verbose) errs = errors.notify_about_errors(
results, query_notify, show_statistics=args.verbose
)
for e in errs: for e in errs:
query_notify.warning(*e) query_notify.warning(*e)
+10 -4
View File
@@ -1,6 +1,7 @@
import asyncio import asyncio
import json import json
import re import re
import os
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from aiohttp import ClientSession, TCPConnector from aiohttp import ClientSession, TCPConnector
@@ -61,7 +62,10 @@ class Submitter:
proxy = self.args.proxy proxy = self.args.proxy
cookie_jar = None cookie_jar = None
if args.cookie_file: if args.cookie_file:
cookie_jar = import_aiohttp_cookies(args.cookie_file) if not os.path.exists(args.cookie_file):
logger.error(f"Cookie file {args.cookie_file} does not exist!")
else:
cookie_jar = import_aiohttp_cookies(args.cookie_file)
connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=False) connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=False)
connector.verify_ssl = False connector.verify_ssl = False
@@ -123,7 +127,9 @@ class Submitter:
async def detect_known_engine( async def detect_known_engine(
self, url_exists, url_mainpage self, url_exists, url_mainpage
) -> [List[MaigretSite], str]: ) -> [List[MaigretSite], str]:
resp_text = '' resp_text = ''
try: try:
r = await self.session.get(url_mainpage) r = await self.session.get(url_mainpage)
content = await r.content.read() content = await r.content.read()
@@ -131,8 +137,8 @@ class Submitter:
resp_text = content.decode(charset, "ignore") resp_text = content.decode(charset, "ignore")
self.logger.debug(resp_text) self.logger.debug(resp_text)
except Exception as e: except Exception as e:
self.logger.warning(e) self.logger.warning(e, exc_info=True)
print("Some error while checking main page") print(f"Some error while checking main page: {e}")
return [], resp_text return [], resp_text
for engine in self.db.engines: for engine in self.db.engines:
@@ -160,7 +166,7 @@ class Submitter:
for u in usernames_to_check: for u in usernames_to_check:
site_data = { site_data = {
"urlMain": url_mainpage, "urlMain": url_mainpage,
"name": url_mainpage.split("//")[1], "name": url_mainpage.split("//")[1].split("/")[0],
"engine": engine_name, "engine": engine_name,
"usernameClaimed": u, "usernameClaimed": u,
"usernameUnclaimed": "noonewouldeverusethis7", "usernameUnclaimed": "noonewouldeverusethis7",
+19 -1
View File
@@ -1,5 +1,23 @@
{ {
"engines": {}, "engines": {
"Discourse": {
"name": "Discourse",
"site": {
"presenseStrs": [
"<meta name=\"generator\" content=\"Discourse"
],
"absenceStrs": [
"Oops! That page doesn\u2019t exist or is private.",
"wrap not-found-container"
],
"checkType": "message",
"url": "{urlMain}/u/{username}/summary"
},
"presenseStrs": [
"<meta name=\"generator\" content=\"Discourse"
]
}
},
"sites": { "sites": {
"ValidActive": { "ValidActive": {
"tags": ["global", "us"], "tags": ["global", "us"],
+37 -7
View File
@@ -6,18 +6,48 @@ from maigret.result import MaigretCheckResult, MaigretCheckStatus
def test_notify_about_errors(): def test_notify_about_errors():
results = { results = {
'site1': {'status': MaigretCheckResult('', '', '', MaigretCheckStatus.UNKNOWN, error=CheckError('Captcha'))}, 'site1': {
'site2': {'status': MaigretCheckResult('', '', '', MaigretCheckStatus.UNKNOWN, error=CheckError('Bot protection'))}, 'status': MaigretCheckResult(
'site3': {'status': MaigretCheckResult('', '', '', MaigretCheckStatus.UNKNOWN, error=CheckError('Access denied'))}, '', '', '', MaigretCheckStatus.UNKNOWN, error=CheckError('Captcha')
'site4': {'status': MaigretCheckResult('', '', '', MaigretCheckStatus.CLAIMED, error=None)}, )
},
'site2': {
'status': MaigretCheckResult(
'',
'',
'',
MaigretCheckStatus.UNKNOWN,
error=CheckError('Bot protection'),
)
},
'site3': {
'status': MaigretCheckResult(
'',
'',
'',
MaigretCheckStatus.UNKNOWN,
error=CheckError('Access denied'),
)
},
'site4': {
'status': MaigretCheckResult(
'', '', '', MaigretCheckStatus.CLAIMED, error=None
)
},
} }
results = notify_about_errors(results, query_notify=None, show_statistics=True) results = notify_about_errors(results, query_notify=None, show_statistics=True)
# Check the output # Check the output
expected_output = [ expected_output = [
('Too many errors of type "Captcha" (25.0%). Try to switch to another ip address or to use service cookies', '!'), (
('Too many errors of type "Bot protection" (25.0%). Try to switch to another ip address', '!'), 'Too many errors of type "Captcha" (25.0%). Try to switch to another ip address or to use service cookies',
'!',
),
(
'Too many errors of type "Bot protection" (25.0%). Try to switch to another ip address',
'!',
),
('Too many errors of type "Access denied" (25.0%)', '!'), ('Too many errors of type "Access denied" (25.0%)', '!'),
('Verbose error statistics:', '-'), ('Verbose error statistics:', '-'),
('Captcha: 25.0%', '!'), ('Captcha: 25.0%', '!'),
@@ -25,4 +55,4 @@ def test_notify_about_errors():
('Access denied: 25.0%', '!'), ('Access denied: 25.0%', '!'),
('You can see detailed site check errors with a flag `--print-errors`', '-'), ('You can see detailed site check errors with a flag `--print-errors`', '-'),
] ]
assert results == expected_output assert results == expected_output
+1
View File
@@ -15,6 +15,7 @@ from maigret.sites import MaigretSite
from maigret.result import MaigretCheckResult, MaigretCheckStatus from maigret.result import MaigretCheckResult, MaigretCheckStatus
from tests.conftest import RESULTS_EXAMPLE from tests.conftest import RESULTS_EXAMPLE
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_self_check_db(test_db): async def test_self_check_db(test_db):
+31 -15
View File
@@ -7,12 +7,18 @@ def test_gather_strict():
permute = Permute(elements) permute = Permute(elements)
result = permute.gather(method="strict") result = permute.gather(method="strict")
expected = { expected = {
'a_b': 1, 'b_a': 2, 'a_b': 1,
'a-b': 1, 'b-a': 2, 'b_a': 2,
'a.b': 1, 'b.a': 2, 'a-b': 1,
'ab': 1, 'ba': 2, 'b-a': 2,
'_ab': 1, 'ab_': 1, 'a.b': 1,
'_ba': 2, 'ba_': 2 'b.a': 2,
'ab': 1,
'ba': 2,
'_ab': 1,
'ab_': 1,
'_ba': 2,
'ba_': 2,
} }
assert result == expected assert result == expected
@@ -22,13 +28,23 @@ def test_gather_all():
permute = Permute(elements) permute = Permute(elements)
result = permute.gather(method="all") result = permute.gather(method="all")
expected = { expected = {
'a': 1, '_a': 1, 'a_': 1, 'a': 1,
'b': 2, '_b': 2, 'b_': 2, '_a': 1,
'a_b': 1, 'b_a': 2, 'a_': 1,
'a-b': 1, 'b-a': 2, 'b': 2,
'a.b': 1, 'b.a': 2, '_b': 2,
'ab': 1, 'ba': 2, 'b_': 2,
'_ab': 1, 'ab_': 1, 'a_b': 1,
'_ba': 2, 'ba_': 2 'b_a': 2,
'a-b': 1,
'b-a': 2,
'a.b': 1,
'b.a': 2,
'ab': 1,
'ba': 2,
'_ab': 1,
'ab_': 1,
'_ba': 2,
'ba_': 2,
} }
assert result == expected assert result == expected