Multiple lint and types fixes (#2454)

This commit is contained in:
Soxoj
2026-04-02 21:01:49 +02:00
committed by GitHub
parent 5d502eaef6
commit d136014576
17 changed files with 92 additions and 92 deletions
+3 -3
View File
@@ -54,7 +54,7 @@ class ParsingActivator:
logger.debug( logger.debug(
f"1 stage: {'success' if r.status_code == 302 else 'no 302 redirect, fail!'}" f"1 stage: {'success' if r.status_code == 302 else 'no 302 redirect, fail!'}"
) )
location = r.headers.get("Location") location = r.headers.get("Location", "")
# 2 stage: go to passport visitor page # 2 stage: go to passport visitor page
headers["Referer"] = location headers["Referer"] = location
@@ -84,9 +84,9 @@ def import_aiohttp_cookies(cookiestxt_filename):
cookies = CookieJar() cookies = CookieJar()
cookies_list = [] cookies_list = []
for domain in cookies_obj._cookies.values(): for domain in cookies_obj._cookies.values(): # type: ignore[attr-defined]
for key, cookie in list(domain.values())[0].items(): for key, cookie in list(domain.values())[0].items():
c = Morsel() c: Morsel = Morsel()
c.set(key, cookie.value, cookie.value) c.set(key, cookie.value, cookie.value)
c["domain"] = cookie.domain c["domain"] = cookie.domain
c["path"] = cookie.path c["path"] = cookie.path
+13 -17
View File
@@ -6,7 +6,7 @@ import random
import re import re
import ssl import ssl
import sys import sys
from typing import Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote from urllib.parse import quote
# Third party imports # Third party imports
@@ -15,7 +15,7 @@ from alive_progress import alive_bar
from aiohttp import ClientSession, TCPConnector, http_exceptions from aiohttp import ClientSession, TCPConnector, http_exceptions
from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError
from python_socks import _errors as proxy_errors from python_socks import _errors as proxy_errors
from socid_extractor import extract from socid_extractor import extract # type: ignore[import-not-found]
try: try:
from mock import Mock from mock import Mock
@@ -80,7 +80,7 @@ class SimpleAiohttpChecker(CheckerBase):
async def _make_request( async def _make_request(
self, session, url, headers, allow_redirects, timeout, method, logger, payload=None self, session, url, headers, allow_redirects, timeout, method, logger, payload=None
) -> Tuple[str, int, Optional[CheckError]]: ) -> Tuple[Optional[str], int, Optional[CheckError]]:
try: try:
if method.lower() == 'get': if method.lower() == 'get':
request_method = session.get request_method = session.get
@@ -136,7 +136,7 @@ class SimpleAiohttpChecker(CheckerBase):
logger.debug(e, exc_info=True) logger.debug(e, exc_info=True)
return None, 0, CheckError("Unexpected", str(e)) return None, 0, CheckError("Unexpected", str(e))
async def check(self) -> Tuple[str, int, Optional[CheckError]]: async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
from aiohttp_socks import ProxyConnector from aiohttp_socks import ProxyConnector
# Use a real SSL context instead of ssl=False to avoid TLS fingerprinting # Use a real SSL context instead of ssl=False to avoid TLS fingerprinting
@@ -195,7 +195,7 @@ class AiodnsDomainResolver(CheckerBase):
self.url = url self.url = url
return None return None
async def check(self) -> Tuple[str, int, Optional[CheckError]]: async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
status = 404 status = 404
error = None error = None
text = '' text = ''
@@ -246,7 +246,7 @@ class CurlCffiChecker(CheckerBase):
async def close(self): async def close(self):
pass pass
async def check(self) -> Tuple[str, int, Optional[CheckError]]: async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
try: try:
async with CurlCffiAsyncSession() as session: async with CurlCffiAsyncSession() as session:
kwargs = { kwargs = {
@@ -290,7 +290,7 @@ class CheckerMock:
def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None): def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None):
return None return None
async def check(self) -> Tuple[str, int, Optional[CheckError]]: async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
await asyncio.sleep(0) await asyncio.sleep(0)
return '', 0, None return '', 0, None
@@ -885,7 +885,7 @@ async def maigret(
with alive_bar( with alive_bar(
len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar
) as progress: ) as progress:
async for result in executor.run(tasks_dict.values()): async for result in executor.run(list(tasks_dict.values())): # type: ignore[arg-type]
cur_results.append(result) cur_results.append(result)
progress() progress()
@@ -961,7 +961,7 @@ async def site_self_check(
If False (default), only report issues without disabling. If False (default), only report issues without disabling.
diagnose: If True, print detailed diagnosis information. diagnose: If True, print detailed diagnosis information.
""" """
changes = { changes: Dict[str, Any] = {
"disabled": False, "disabled": False,
"issues": [], "issues": [],
"recommendations": [], "recommendations": [],
@@ -1008,7 +1008,7 @@ async def site_self_check(
results_cache[username] = results_dict[site.name] results_cache[username] = results_dict[site.name]
if result.error and 'Cannot connect to host' in result.error.desc: if result.error and 'Cannot connect to host' in result.error.desc:
changes["issues"].append(f"Cannot connect to host") changes["issues"].append("Cannot connect to host")
if auto_disable: if auto_disable:
changes["disabled"] = True changes["disabled"] = True
@@ -1066,11 +1066,11 @@ async def site_self_check(
if diagnose and changes["issues"]: if diagnose and changes["issues"]:
print(f"\n--- {site.name} DIAGNOSIS ---") print(f"\n--- {site.name} DIAGNOSIS ---")
print(f" Check type: {site.check_type}") print(f" Check type: {site.check_type}")
print(f" Issues:") print(" Issues:")
for issue in changes["issues"]: for issue in changes["issues"]:
print(f" - {issue}") print(f" - {issue}")
if changes["recommendations"]: if changes["recommendations"]:
print(f" Recommendations:") print(" Recommendations:")
for rec in changes["recommendations"]: for rec in changes["recommendations"]:
print(f" -> {rec}") print(f" -> {rec}")
@@ -1178,10 +1178,6 @@ async def self_check(
needs_update = total_disabled != 0 or unchecked_new_count != unchecked_old_count needs_update = total_disabled != 0 or unchecked_new_count != unchecked_old_count
# For backwards compatibility, return bool if auto_disable is True
if auto_disable:
return needs_update
return { return {
'needs_update': needs_update, 'needs_update': needs_update,
'results': all_results, 'results': all_results,
@@ -1205,7 +1201,7 @@ def parse_usernames(extracted_ids_data, logger) -> Dict:
elif "usernames" in k: elif "usernames" in k:
try: try:
tree = ast.literal_eval(v) tree = ast.literal_eval(v)
if type(tree) == list: if isinstance(tree, list):
for n in tree: for n in tree:
new_usernames[n] = "username" new_usernames[n] = "username"
except Exception as e: except Exception as e:
+3 -3
View File
@@ -103,7 +103,7 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.workers_count = kwargs.get('in_parallel', 10) self.workers_count = kwargs.get('in_parallel', 10)
self.queue = asyncio.Queue(self.workers_count) self.queue: asyncio.Queue = asyncio.Queue(self.workers_count)
self.timeout = kwargs.get('timeout') self.timeout = kwargs.get('timeout')
# Pass a progress function; alive_bar by default # Pass a progress function; alive_bar by default
self.progress_func = kwargs.get('progress_func', alive_bar) self.progress_func = kwargs.get('progress_func', alive_bar)
@@ -184,10 +184,10 @@ class AsyncioQueueGeneratorExecutor:
# Deprecated: will be removed soon, don't use it # Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.workers_count = kwargs.get('in_parallel', 10) self.workers_count = kwargs.get('in_parallel', 10)
self.queue = asyncio.Queue() self.queue: asyncio.Queue = asyncio.Queue()
self.timeout = kwargs.get('timeout') self.timeout = kwargs.get('timeout')
self.logger = kwargs['logger'] self.logger = kwargs['logger']
self._results = asyncio.Queue() self._results: asyncio.Queue = asyncio.Queue()
self._stop_signal = object() self._stop_signal = object()
async def worker(self): async def worker(self):
+2 -6
View File
@@ -13,7 +13,7 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter
from typing import List, Tuple from typing import List, Tuple
import os.path as path import os.path as path
from socid_extractor import extract, parse from socid_extractor import extract, parse # type: ignore[import-not-found]
from .__version__ import __version__ from .__version__ import __version__
from .checking import ( from .checking import (
@@ -75,7 +75,7 @@ def extract_ids_from_page(url, logger, timeout=5) -> dict:
elif 'usernames' in k: elif 'usernames' in k:
try: try:
tree = ast.literal_eval(v) tree = ast.literal_eval(v)
if type(tree) == list: if isinstance(tree, list):
for n in tree: for n in tree:
results[n] = 'username' results[n] = 'username'
except Exception as e: except Exception as e:
@@ -603,11 +603,7 @@ async def main():
no_progressbar=args.no_progressbar, no_progressbar=args.no_progressbar,
) )
# Handle both old (bool) and new (dict) return types
if isinstance(check_result, dict):
is_need_update = check_result.get('needs_update', False) is_need_update = check_result.get('needs_update', False)
else:
is_need_update = check_result
if is_need_update: if is_need_update:
if input('Do you want to save changes permanently? [Yn]\n').lower() in ( if input('Do you want to save changes permanently? [Yn]\n').lower() in (
+1 -1
View File
@@ -174,7 +174,7 @@ class QueryNotifyPrint(QueryNotify):
else: else:
return self.make_simple_terminal_notify(*args) return self.make_simple_terminal_notify(*args)
def start(self, message, id_type): def start(self, message=None, id_type="username"):
"""Notify Start. """Notify Start.
Will print the title to the standard output. Will print the title to the standard output.
+13 -12
View File
@@ -7,7 +7,7 @@ import os
from datetime import datetime from datetime import datetime
from typing import Dict, Any from typing import Dict, Any
import xmind import xmind # type: ignore[import-untyped]
from dateutil.tz import gettz from dateutil.tz import gettz
from dateutil.parser import parse as parse_datetime_str from dateutil.parser import parse as parse_datetime_str
from jinja2 import Template from jinja2 import Template
@@ -79,7 +79,7 @@ def save_pdf_report(filename: str, context: dict):
filled_template = template.render(**context) filled_template = template.render(**context)
# moved here to speed up the launch of Maigret # moved here to speed up the launch of Maigret
from xhtml2pdf import pisa from xhtml2pdf import pisa # type: ignore[import-untyped]
with open(filename, "w+b") as f: with open(filename, "w+b") as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css) pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
@@ -91,9 +91,9 @@ def save_json_report(filename: str, username: str, results: dict, report_type: s
class MaigretGraph: class MaigretGraph:
other_params = {'size': 10, 'group': 3} other_params: dict = {'size': 10, 'group': 3}
site_params = {'size': 15, 'group': 2} site_params: dict = {'size': 15, 'group': 2}
username_params = {'size': 20, 'group': 1} username_params: dict = {'size': 20, 'group': 1}
def __init__(self, graph): def __init__(self, graph):
self.G = graph self.G = graph
@@ -121,12 +121,12 @@ class MaigretGraph:
def save_graph_report(filename: str, username_results: list, db: MaigretDatabase): def save_graph_report(filename: str, username_results: list, db: MaigretDatabase):
import networkx as nx import networkx as nx
G = nx.Graph() G: Any = nx.Graph()
graph = MaigretGraph(G) graph = MaigretGraph(G)
base_site_nodes = {} base_site_nodes = {}
site_account_nodes = {} site_account_nodes = {}
processed_values = {} # Track processed values to avoid duplicates processed_values: Dict[str, Any] = {} # Track processed values to avoid duplicates
for username, id_type, results in username_results: for username, id_type, results in username_results:
# Add username node, using normalized version directly if different # Add username node, using normalized version directly if different
@@ -239,7 +239,7 @@ def save_graph_report(filename: str, username_results: list, db: MaigretDatabase
G.remove_nodes_from(single_degree_sites) G.remove_nodes_from(single_degree_sites)
# Generate interactive visualization # Generate interactive visualization
from pyvis.network import Network from pyvis.network import Network # type: ignore[import-untyped]
nt = Network(notebook=True, height="750px", width="100%") nt = Network(notebook=True, height="750px", width="100%")
nt.from_nx(G) nt.from_nx(G)
@@ -353,11 +353,12 @@ def generate_report_context(username_results: list):
if k in ["country", "locale"]: if k in ["country", "locale"]:
try: try:
if is_country_tag(k): if is_country_tag(k):
tag = pycountry.countries.get(alpha_2=v).alpha_2.lower() country = pycountry.countries.get(alpha_2=v)
tag = country.alpha_2.lower() # type: ignore[union-attr]
else: else:
tag = pycountry.countries.search_fuzzy(v)[ tag = pycountry.countries.search_fuzzy(v)[
0 0
].alpha_2.lower() ].alpha_2.lower() # type: ignore[attr-defined]
# TODO: move countries to another struct # TODO: move countries to another struct
tags[tag] = tags.get(tag, 0) + 1 tags[tag] = tags.get(tag, 0) + 1
except Exception as e: except Exception as e:
@@ -513,8 +514,8 @@ def add_xmind_subtopic(userlink, k, v, supposed_data):
def design_xmind_sheet(sheet, username, results): def design_xmind_sheet(sheet, username, results):
alltags = {} alltags: Dict[str, Any] = {}
supposed_data = {} supposed_data: Dict[str, Any] = {}
sheet.setTitle("%s Analysis" % (username)) sheet.setTitle("%s Analysis" % (username))
root_topic1 = sheet.getRootTopic() root_topic1 = sheet.getRootTopic()
+5 -5
View File
@@ -92,7 +92,7 @@ class MaigretSite:
# Alexa traffic rank # Alexa traffic rank
alexa_rank = None alexa_rank = None
# Source (in case a site is a mirror of another site) # Source (in case a site is a mirror of another site)
source = None source: Optional[str] = None
# URL protocol (http/https) # URL protocol (http/https)
protocol = '' protocol = ''
@@ -175,7 +175,7 @@ class MaigretSite:
self.__dict__[CaseConverter.camel_to_snake(group)], self.__dict__[CaseConverter.camel_to_snake(group)],
) )
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check) self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check or "")
def detect_username(self, url: str) -> Optional[str]: def detect_username(self, url: str) -> Optional[str]:
if self.url_regexp: if self.url_regexp:
@@ -566,7 +566,7 @@ class MaigretDatabase:
def get_scan_stats(self, sites_dict): def get_scan_stats(self, sites_dict):
sites = sites_dict or self.sites_dict sites = sites_dict or self.sites_dict
found_flags = {} found_flags: Dict[str, int] = {}
for _, s in sites.items(): for _, s in sites.items():
if "presense_flag" in s.stats: if "presense_flag" in s.stats:
flag = s.stats["presense_flag"] flag = s.stats["presense_flag"]
@@ -587,8 +587,8 @@ class MaigretDatabase:
def get_db_stats(self, is_markdown=False): def get_db_stats(self, is_markdown=False):
# Initialize counters # Initialize counters
sites_dict = self.sites_dict sites_dict = self.sites_dict
urls = {} urls: Dict[str, int] = {}
tags = {} tags: Dict[str, int] = {}
disabled_count = 0 disabled_count = 0
message_checks_one_factor = 0 message_checks_one_factor = 0
status_checks = 0 status_checks = 0
+27 -24
View File
@@ -6,8 +6,7 @@ import logging
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from aiohttp import ClientSession, TCPConnector from aiohttp import ClientSession, TCPConnector
from aiohttp_socks import ProxyConnector import cloudscraper # type: ignore[import-untyped]
import cloudscraper
from colorama import Fore, Style from colorama import Fore, Style
from .activation import import_aiohttp_cookies from .activation import import_aiohttp_cookies
@@ -68,8 +67,10 @@ class Submitter:
else: else:
cookie_jar = import_aiohttp_cookies(args.cookie_file) cookie_jar = import_aiohttp_cookies(args.cookie_file)
connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=False) ssl_context = __import__('ssl').create_default_context()
connector.verify_ssl = False ssl_context.check_hostname = False
ssl_context.verify_mode = __import__('ssl').CERT_NONE
connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=ssl_context)
self.session = ClientSession( self.session = ClientSession(
connector=connector, trust_env=True, cookie_jar=cookie_jar connector=connector, trust_env=True, cookie_jar=cookie_jar
) )
@@ -88,7 +89,9 @@ class Submitter:
alexa_rank = 0 alexa_rank = 0
try: try:
alexa_rank = int(root.find('.//REACH').attrib['RANK']) reach_elem = root.find('.//REACH')
if reach_elem is not None:
alexa_rank = int(reach_elem.attrib['RANK'])
except Exception: except Exception:
pass pass
@@ -127,7 +130,7 @@ class Submitter:
async def detect_known_engine( async def detect_known_engine(
self, url_exists, url_mainpage, session, follow_redirects, headers self, url_exists, url_mainpage, session, follow_redirects, headers
) -> [List[MaigretSite], str]: ) -> Tuple[List[MaigretSite], str]:
session = session or self.session session = session or self.session
resp_text, _ = await self.get_html_response_to_compare( resp_text, _ = await self.get_html_response_to_compare(
@@ -191,8 +194,9 @@ class Submitter:
# TODO: replace with checking.py/SimpleAiohttpChecker call # TODO: replace with checking.py/SimpleAiohttpChecker call
@staticmethod @staticmethod
async def get_html_response_to_compare( async def get_html_response_to_compare(
url: str, session: ClientSession = None, redirects=False, headers: Dict = None url: str, session: Optional[ClientSession] = None, redirects=False, headers: Optional[Dict] = None
): ):
assert session is not None, "session must not be None"
async with session.get( async with session.get(
url, allow_redirects=redirects, headers=headers url, allow_redirects=redirects, headers=headers
) as response: ) as response:
@@ -211,10 +215,10 @@ class Submitter:
username: str, username: str,
url_exists: str, url_exists: str,
cookie_filename="", # TODO: use cookies cookie_filename="", # TODO: use cookies
session: ClientSession = None, session: Optional[ClientSession] = None,
follow_redirects=False, follow_redirects=False,
headers: dict = None, headers: Optional[dict] = None,
) -> Tuple[List[str], List[str], str, str]: ) -> Tuple[Optional[List[str]], Optional[List[str]], str, str]:
random_username = generate_random_username() random_username = generate_random_username()
url_of_non_existing_account = url_exists.lower().replace( url_of_non_existing_account = url_exists.lower().replace(
@@ -269,11 +273,8 @@ class Submitter:
tokens_a = set(re.split(f'[{self.SEPARATORS}]', first_html_response)) tokens_a = set(re.split(f'[{self.SEPARATORS}]', first_html_response))
tokens_b = set(re.split(f'[{self.SEPARATORS}]', second_html_response)) tokens_b = set(re.split(f'[{self.SEPARATORS}]', second_html_response))
a_minus_b = tokens_a.difference(tokens_b) a_minus_b: List[str] = [x.strip('\\') for x in tokens_a.difference(tokens_b)]
b_minus_a = tokens_b.difference(tokens_a) b_minus_a: List[str] = [x.strip('\\') for x in tokens_b.difference(tokens_a)]
a_minus_b = list(map(lambda x: x.strip('\\'), a_minus_b))
b_minus_a = list(map(lambda x: x.strip('\\'), b_minus_a))
# Filter out strings containing usernames # Filter out strings containing usernames
a_minus_b = [s for s in a_minus_b if username.lower() not in s.lower()] a_minus_b = [s for s in a_minus_b if username.lower() not in s.lower()]
@@ -378,7 +379,7 @@ class Submitter:
).strip() ).strip()
if field in ['tags', 'presense_strs', 'absence_strs']: if field in ['tags', 'presense_strs', 'absence_strs']:
new_value = list(map(str.strip, new_value.split(','))) new_value = list(map(str.strip, new_value.split(','))) # type: ignore[assignment]
if new_value: if new_value:
setattr(site, field, new_value) setattr(site, field, new_value)
@@ -424,12 +425,12 @@ class Submitter:
f"{Fore.YELLOW}[!] Sites with domain \"{domain_raw}\" already exists in the Maigret database!{Style.RESET_ALL}" f"{Fore.YELLOW}[!] Sites with domain \"{domain_raw}\" already exists in the Maigret database!{Style.RESET_ALL}"
) )
status = lambda s: "(disabled)" if s.disabled else "" site_status = lambda s: "(disabled)" if s.disabled else ""
url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}" url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}"
print( print(
"\n".join( "\n".join(
[ [
f"{site.name} {status(site)}{url_block(site)}" f"{site.name} {site_status(site)}{url_block(site)}"
for site in matched_sites for site in matched_sites
] ]
) )
@@ -497,7 +498,7 @@ class Submitter:
) )
print('Detecting site engine, please wait...') print('Detecting site engine, please wait...')
sites = [] sites: List[MaigretSite] = []
text = None text = None
try: try:
sites, text = await self.detect_known_engine( sites, text = await self.detect_known_engine(
@@ -510,7 +511,7 @@ class Submitter:
except KeyboardInterrupt: except KeyboardInterrupt:
print('Engine detect process is interrupted.') print('Engine detect process is interrupted.')
if 'cloudflare' in text.lower(): if text and 'cloudflare' in text.lower():
print( print(
'Cloudflare protection detected. I will use cloudscraper for further work' 'Cloudflare protection detected. I will use cloudscraper for further work'
) )
@@ -573,6 +574,8 @@ class Submitter:
found = True found = True
break break
assert chosen_site is not None, "No sites to check"
if not found: if not found:
print( print(
f"{Fore.RED}[!] The check for site '{chosen_site.name}' failed!{Style.RESET_ALL}" f"{Fore.RED}[!] The check for site '{chosen_site.name}' failed!{Style.RESET_ALL}"
@@ -631,8 +634,8 @@ class Submitter:
# chosen_site.alexa_rank = rank # chosen_site.alexa_rank = rank
self.logger.info(chosen_site.json) self.logger.info(chosen_site.json)
site_data = chosen_site.strip_engine_data() stripped_site = chosen_site.strip_engine_data()
self.logger.info(site_data.json) self.logger.info(stripped_site.json)
if old_site: if old_site:
# Update old site with new values and log changes # Update old site with new values and log changes
@@ -651,7 +654,7 @@ class Submitter:
for field, display_name in fields_to_check.items(): for field, display_name in fields_to_check.items():
old_value = getattr(old_site, field) old_value = getattr(old_site, field)
new_value = getattr(site_data, field) new_value = getattr(stripped_site, field)
if field == 'tags' and not new_tags: if field == 'tags' and not new_tags:
continue continue
if str(old_value) != str(new_value): if str(old_value) != str(new_value):
@@ -661,7 +664,7 @@ class Submitter:
old_site.__dict__[field] = new_value old_site.__dict__[field] = new_value
# update the site # update the site
final_site = old_site if old_site else site_data final_site = old_site if old_site else stripped_site
self.db.update_site(final_site) self.db.update_site(final_site)
# save the db in file # save the db in file
+1 -1
View File
@@ -86,7 +86,7 @@ def get_dict_ascii_tree(items, prepend="", new_line=True):
new_result + new_line if num != len(items) - 1 else last_result + new_line new_result + new_line if num != len(items) - 1 else last_result + new_line
) )
if type(item) == tuple: if isinstance(item, tuple):
field_name, field_value = item field_name, field_value = item
if field_value.startswith("['"): if field_value.startswith("['"):
is_last_item = num == len(items) - 1 is_last_item = num == len(items) - 1
+3 -2
View File
@@ -13,6 +13,7 @@ import os
import asyncio import asyncio
from datetime import datetime from datetime import datetime
from threading import Thread from threading import Thread
from typing import Any, Dict
import maigret import maigret
import maigret.settings import maigret.settings
from maigret.sites import MaigretDatabase from maigret.sites import MaigretDatabase
@@ -23,7 +24,7 @@ app = Flask(__name__)
app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24).hex()) app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24).hex())
# add background job tracking # add background job tracking
background_jobs = {} background_jobs: Dict[str, Any] = {}
job_results = {} job_results = {}
# Configuration # Configuration
@@ -260,7 +261,7 @@ def search():
target=process_search_task, args=(usernames, options, timestamp) target=process_search_task, args=(usernames, options, timestamp)
), ),
} }
background_jobs[timestamp]['thread'].start() background_jobs[timestamp]['thread'].start() # type: ignore[union-attr]
return redirect(url_for('status', timestamp=timestamp)) return redirect(url_for('status', timestamp=timestamp))
+2 -2
View File
@@ -36,7 +36,7 @@ def test_notify_about_errors():
}, },
} }
results = notify_about_errors(results, query_notify=None, show_statistics=True) notifications = notify_about_errors(results, query_notify=None, show_statistics=True)
# Check the output # Check the output
expected_output = [ expected_output = [
@@ -55,4 +55,4 @@ def test_notify_about_errors():
('Access denied: 25.0%', '!'), ('Access denied: 25.0%', '!'),
('You can see detailed site check errors with a flag `--print-errors`', '-'), ('You can see detailed site check errors with a flag `--print-errors`', '-'),
] ]
assert results == expected_output assert notifications == expected_output
+10 -9
View File
@@ -3,6 +3,7 @@
import pytest import pytest
import asyncio import asyncio
import logging import logging
from typing import Any, List, Tuple, Callable, Dict
from maigret.executors import ( from maigret.executors import (
AsyncioSimpleExecutor, AsyncioSimpleExecutor,
AsyncioProgressbarExecutor, AsyncioProgressbarExecutor,
@@ -21,7 +22,7 @@ async def func(n):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple_asyncio_executor(): async def test_simple_asyncio_executor():
tasks = [(func, [n], {}) for n in range(10)] tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
executor = AsyncioSimpleExecutor(logger=logger) executor = AsyncioSimpleExecutor(logger=logger)
assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert executor.execution_time > 0.2 assert executor.execution_time > 0.2
@@ -30,7 +31,7 @@ async def test_simple_asyncio_executor():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_asyncio_progressbar_executor(): async def test_asyncio_progressbar_executor():
tasks = [(func, [n], {}) for n in range(10)] tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarExecutor(logger=logger) executor = AsyncioProgressbarExecutor(logger=logger)
# no guarantees for the results order # no guarantees for the results order
@@ -41,7 +42,7 @@ async def test_asyncio_progressbar_executor():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_asyncio_progressbar_semaphore_executor(): async def test_asyncio_progressbar_semaphore_executor():
tasks = [(func, [n], {}) for n in range(10)] tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5) executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5)
# no guarantees for the results order # no guarantees for the results order
@@ -53,7 +54,7 @@ async def test_asyncio_progressbar_semaphore_executor():
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_asyncio_progressbar_queue_executor(): async def test_asyncio_progressbar_queue_executor():
tasks = [(func, [n], {}) for n in range(10)] tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2) executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2)
assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8] assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
@@ -81,22 +82,22 @@ async def test_asyncio_progressbar_queue_executor():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_asyncio_queue_generator_executor(): async def test_asyncio_queue_generator_executor():
tasks = [(func, [n], {}) for n in range(10)] tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)]
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2) executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2)
results = [result async for result in executor.run(tasks)] results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8] assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
assert executor.execution_time > 0.5 assert executor.execution_time > 0.5
assert executor.execution_time < 0.6 assert executor.execution_time < 0.6
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3) executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3)
results = [result async for result in executor.run(tasks)] results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8] assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
assert executor.execution_time > 0.4 assert executor.execution_time > 0.4
assert executor.execution_time < 0.5 assert executor.execution_time < 0.5
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5) executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5)
results = [result async for result in executor.run(tasks)] results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
assert results in ( assert results in (
[0, 3, 6, 1, 4, 7, 9, 2, 5, 8], [0, 3, 6, 1, 4, 7, 9, 2, 5, 8],
[0, 3, 6, 1, 4, 9, 7, 2, 5, 8], [0, 3, 6, 1, 4, 9, 7, 2, 5, 8],
@@ -105,7 +106,7 @@ async def test_asyncio_queue_generator_executor():
assert executor.execution_time < 0.4 assert executor.execution_time < 0.4
executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10) executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10)
results = [result async for result in executor.run(tasks)] results = [result async for result in executor.run(tasks)] # type: ignore[arg-type]
assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2 assert executor.execution_time > 0.2
assert executor.execution_time < 0.3 assert executor.execution_time < 0.3
+1 -1
View File
@@ -158,7 +158,7 @@ def test_extract_ids_from_page(test_db):
def test_extract_ids_from_results(test_db): def test_extract_ids_from_results(test_db):
TEST_EXAMPLE = copy.deepcopy(RESULTS_EXAMPLE) TEST_EXAMPLE: dict = copy.deepcopy(RESULTS_EXAMPLE)
TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'} TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'}
TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2'] TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2']
+1 -1
View File
@@ -6,7 +6,7 @@ import os
import pytest import pytest
from io import StringIO from io import StringIO
import xmind import xmind # type: ignore[import-untyped]
from jinja2 import Template from jinja2 import Template
from maigret.report import ( from maigret.report import (
+3 -1
View File
@@ -1,8 +1,10 @@
"""Maigret Database test functions""" """Maigret Database test functions"""
from typing import Any, Dict
from maigret.sites import MaigretDatabase, MaigretSite from maigret.sites import MaigretDatabase, MaigretSite
EXAMPLE_DB = { EXAMPLE_DB: Dict[str, Any] = {
'engines': { 'engines': {
"XenForo": { "XenForo": {
"presenseStrs": ["XenForo"], "presenseStrs": ["XenForo"],
+2 -2
View File
@@ -28,7 +28,7 @@ async def test_detect_known_engine(test_db, local_test_db):
url_exists = "https://devforum.zoom.us/u/adam" url_exists = "https://devforum.zoom.us/u/adam"
url_mainpage = "https://devforum.zoom.us/" url_mainpage = "https://devforum.zoom.us/"
# Mock extract_username_dialog to return "adam" # Mock extract_username_dialog to return "adam"
submitter.extract_username_dialog = MagicMock(return_value="adam") submitter.extract_username_dialog = MagicMock(return_value="adam") # type: ignore[method-assign]
sites, resp_text = await submitter.detect_known_engine( sites, resp_text = await submitter.detect_known_engine(
url_exists, url_mainpage, session=None, follow_redirects=False, headers=None url_exists, url_mainpage, session=None, follow_redirects=False, headers=None
@@ -111,7 +111,7 @@ async def test_check_features_manually_success(settings):
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_check_features_manually_success(settings): async def test_check_features_manually_cloudflare(settings):
# Setup # Setup
db = MaigretDatabase() db = MaigretDatabase()
logger = logging.getLogger("test_logger") logger = logging.getLogger("test_logger")