diff --git a/maigret/activation.py b/maigret/activation.py index 65d9024..faa1acb 100644 --- a/maigret/activation.py +++ b/maigret/activation.py @@ -54,7 +54,7 @@ class ParsingActivator: logger.debug( f"1 stage: {'success' if r.status_code == 302 else 'no 302 redirect, fail!'}" ) - location = r.headers.get("Location") + location = r.headers.get("Location", "") # 2 stage: go to passport visitor page headers["Referer"] = location @@ -84,9 +84,9 @@ def import_aiohttp_cookies(cookiestxt_filename): cookies = CookieJar() cookies_list = [] - for domain in cookies_obj._cookies.values(): + for domain in cookies_obj._cookies.values(): # type: ignore[attr-defined] for key, cookie in list(domain.values())[0].items(): - c = Morsel() + c: Morsel = Morsel() c.set(key, cookie.value, cookie.value) c["domain"] = cookie.domain c["path"] = cookie.path diff --git a/maigret/checking.py b/maigret/checking.py index 8151091..f469af5 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -6,7 +6,7 @@ import random import re import ssl import sys -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from urllib.parse import quote # Third party imports @@ -15,7 +15,7 @@ from alive_progress import alive_bar from aiohttp import ClientSession, TCPConnector, http_exceptions from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError from python_socks import _errors as proxy_errors -from socid_extractor import extract +from socid_extractor import extract # type: ignore[import-not-found] try: from mock import Mock @@ -80,7 +80,7 @@ class SimpleAiohttpChecker(CheckerBase): async def _make_request( self, session, url, headers, allow_redirects, timeout, method, logger, payload=None - ) -> Tuple[str, int, Optional[CheckError]]: + ) -> Tuple[Optional[str], int, Optional[CheckError]]: try: if method.lower() == 'get': request_method = session.get @@ -136,7 +136,7 @@ class SimpleAiohttpChecker(CheckerBase): logger.debug(e, exc_info=True) return None, 0, CheckError("Unexpected", str(e)) - async def check(self) -> Tuple[str, int, Optional[CheckError]]: + async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]: from aiohttp_socks import ProxyConnector # Use a real SSL context instead of ssl=False to avoid TLS fingerprinting @@ -195,7 +195,7 @@ class AiodnsDomainResolver(CheckerBase): self.url = url return None - async def check(self) -> Tuple[str, int, Optional[CheckError]]: + async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]: status = 404 error = None text = '' @@ -246,7 +246,7 @@ class CurlCffiChecker(CheckerBase): async def close(self): pass - async def check(self) -> Tuple[str, int, Optional[CheckError]]: + async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]: try: async with CurlCffiAsyncSession() as session: kwargs = { @@ -290,7 +290,7 @@ class CheckerMock: def prepare(self, url, headers=None, allow_redirects=True, timeout=0, method='get', payload=None): return None - async def check(self) -> Tuple[str, int, Optional[CheckError]]: + async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]: await asyncio.sleep(0) return '', 0, None @@ -885,7 +885,7 @@ async def maigret( with alive_bar( len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar ) as progress: - async for result in executor.run(tasks_dict.values()): + async for result in executor.run(list(tasks_dict.values())): # type: ignore[arg-type] cur_results.append(result) progress() @@ -961,7 +961,7 @@ async def site_self_check( If False (default), only report issues without disabling. diagnose: If True, print detailed diagnosis information. """ - changes = { + changes: Dict[str, Any] = { "disabled": False, "issues": [], "recommendations": [], @@ -1008,7 +1008,7 @@ async def site_self_check( results_cache[username] = results_dict[site.name] if result.error and 'Cannot connect to host' in result.error.desc: - changes["issues"].append(f"Cannot connect to host") + changes["issues"].append("Cannot connect to host") if auto_disable: changes["disabled"] = True @@ -1066,11 +1066,11 @@ async def site_self_check( if diagnose and changes["issues"]: print(f"\n--- {site.name} DIAGNOSIS ---") print(f" Check type: {site.check_type}") - print(f" Issues:") + print(" Issues:") for issue in changes["issues"]: print(f" - {issue}") if changes["recommendations"]: - print(f" Recommendations:") + print(" Recommendations:") for rec in changes["recommendations"]: print(f" -> {rec}") @@ -1178,10 +1178,6 @@ async def self_check( needs_update = total_disabled != 0 or unchecked_new_count != unchecked_old_count - # For backwards compatibility, return bool if auto_disable is True - if auto_disable: - return needs_update - return { 'needs_update': needs_update, 'results': all_results, @@ -1205,7 +1201,7 @@ def parse_usernames(extracted_ids_data, logger) -> Dict: elif "usernames" in k: try: tree = ast.literal_eval(v) - if type(tree) == list: + if isinstance(tree, list): for n in tree: new_usernames[n] = "username" except Exception as e: diff --git a/maigret/executors.py b/maigret/executors.py index a14c4c2..0c8243a 100644 --- a/maigret/executors.py +++ b/maigret/executors.py @@ -103,7 +103,7 @@ class AsyncioProgressbarQueueExecutor(AsyncExecutor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.workers_count = kwargs.get('in_parallel', 10) - self.queue = asyncio.Queue(self.workers_count) + self.queue: asyncio.Queue = asyncio.Queue(self.workers_count) self.timeout = kwargs.get('timeout') # Pass a progress function; alive_bar by default self.progress_func = kwargs.get('progress_func', alive_bar) @@ -184,10 +184,10 @@ class AsyncioQueueGeneratorExecutor: # Deprecated: will be removed soon, don't use it def __init__(self, *args, **kwargs): self.workers_count = kwargs.get('in_parallel', 10) - self.queue = asyncio.Queue() + self.queue: asyncio.Queue = asyncio.Queue() self.timeout = kwargs.get('timeout') self.logger = kwargs['logger'] - self._results = asyncio.Queue() + self._results: asyncio.Queue = asyncio.Queue() self._stop_signal = object() async def worker(self): diff --git a/maigret/maigret.py b/maigret/maigret.py index a96bd53..dc1235e 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -13,7 +13,7 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter from typing import List, Tuple import os.path as path -from socid_extractor import extract, parse +from socid_extractor import extract, parse # type: ignore[import-not-found] from .__version__ import __version__ from .checking import ( @@ -75,7 +75,7 @@ def extract_ids_from_page(url, logger, timeout=5) -> dict: elif 'usernames' in k: try: tree = ast.literal_eval(v) - if type(tree) == list: + if isinstance(tree, list): for n in tree: results[n] = 'username' except Exception as e: @@ -603,11 +603,7 @@ async def main(): no_progressbar=args.no_progressbar, ) - # Handle both old (bool) and new (dict) return types - if isinstance(check_result, dict): - is_need_update = check_result.get('needs_update', False) - else: - is_need_update = check_result + is_need_update = check_result.get('needs_update', False) if is_need_update: if input('Do you want to save changes permanently? [Yn]\n').lower() in ( diff --git a/maigret/notify.py b/maigret/notify.py index c801222..216d9b2 100644 --- a/maigret/notify.py +++ b/maigret/notify.py @@ -174,7 +174,7 @@ class QueryNotifyPrint(QueryNotify): else: return self.make_simple_terminal_notify(*args) - def start(self, message, id_type): + def start(self, message=None, id_type="username"): """Notify Start. Will print the title to the standard output. diff --git a/maigret/report.py b/maigret/report.py index 22814b7..9c13e3d 100644 --- a/maigret/report.py +++ b/maigret/report.py @@ -7,7 +7,7 @@ import os from datetime import datetime from typing import Dict, Any -import xmind +import xmind # type: ignore[import-untyped] from dateutil.tz import gettz from dateutil.parser import parse as parse_datetime_str from jinja2 import Template @@ -79,7 +79,7 @@ def save_pdf_report(filename: str, context: dict): filled_template = template.render(**context) # moved here to speed up the launch of Maigret - from xhtml2pdf import pisa + from xhtml2pdf import pisa # type: ignore[import-untyped] with open(filename, "w+b") as f: pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css) @@ -91,9 +91,9 @@ def save_json_report(filename: str, username: str, results: dict, report_type: s class MaigretGraph: - other_params = {'size': 10, 'group': 3} - site_params = {'size': 15, 'group': 2} - username_params = {'size': 20, 'group': 1} + other_params: dict = {'size': 10, 'group': 3} + site_params: dict = {'size': 15, 'group': 2} + username_params: dict = {'size': 20, 'group': 1} def __init__(self, graph): self.G = graph @@ -121,12 +121,12 @@ class MaigretGraph: def save_graph_report(filename: str, username_results: list, db: MaigretDatabase): import networkx as nx - G = nx.Graph() + G: Any = nx.Graph() graph = MaigretGraph(G) base_site_nodes = {} site_account_nodes = {} - processed_values = {} # Track processed values to avoid duplicates + processed_values: Dict[str, Any] = {} # Track processed values to avoid duplicates for username, id_type, results in username_results: # Add username node, using normalized version directly if different @@ -239,7 +239,7 @@ def save_graph_report(filename: str, username_results: list, db: MaigretDatabase G.remove_nodes_from(single_degree_sites) # Generate interactive visualization - from pyvis.network import Network + from pyvis.network import Network # type: ignore[import-untyped] nt = Network(notebook=True, height="750px", width="100%") nt.from_nx(G) @@ -353,11 +353,12 @@ def generate_report_context(username_results: list): if k in ["country", "locale"]: try: if is_country_tag(k): - tag = pycountry.countries.get(alpha_2=v).alpha_2.lower() + country = pycountry.countries.get(alpha_2=v) + tag = country.alpha_2.lower() # type: ignore[union-attr] else: tag = pycountry.countries.search_fuzzy(v)[ 0 - ].alpha_2.lower() + ].alpha_2.lower() # type: ignore[attr-defined] # TODO: move countries to another struct tags[tag] = tags.get(tag, 0) + 1 except Exception as e: @@ -513,8 +514,8 @@ def add_xmind_subtopic(userlink, k, v, supposed_data): def design_xmind_sheet(sheet, username, results): - alltags = {} - supposed_data = {} + alltags: Dict[str, Any] = {} + supposed_data: Dict[str, Any] = {} sheet.setTitle("%s Analysis" % (username)) root_topic1 = sheet.getRootTopic() diff --git a/maigret/sites.py b/maigret/sites.py index aec50ac..3d059cc 100644 --- a/maigret/sites.py +++ b/maigret/sites.py @@ -92,7 +92,7 @@ class MaigretSite: # Alexa traffic rank alexa_rank = None # Source (in case a site is a mirror of another site) - source = None + source: Optional[str] = None # URL protocol (http/https) protocol = '' @@ -175,7 +175,7 @@ class MaigretSite: self.__dict__[CaseConverter.camel_to_snake(group)], ) - self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check) + self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check or "") def detect_username(self, url: str) -> Optional[str]: if self.url_regexp: @@ -566,7 +566,7 @@ class MaigretDatabase: def get_scan_stats(self, sites_dict): sites = sites_dict or self.sites_dict - found_flags = {} + found_flags: Dict[str, int] = {} for _, s in sites.items(): if "presense_flag" in s.stats: flag = s.stats["presense_flag"] @@ -587,8 +587,8 @@ class MaigretDatabase: def get_db_stats(self, is_markdown=False): # Initialize counters sites_dict = self.sites_dict - urls = {} - tags = {} + urls: Dict[str, int] = {} + tags: Dict[str, int] = {} disabled_count = 0 message_checks_one_factor = 0 status_checks = 0 diff --git a/maigret/submit.py b/maigret/submit.py index 4603aef..2b470ce 100644 --- a/maigret/submit.py +++ b/maigret/submit.py @@ -6,8 +6,7 @@ import logging from typing import Any, Dict, List, Optional, Tuple from aiohttp import ClientSession, TCPConnector -from aiohttp_socks import ProxyConnector -import cloudscraper +import cloudscraper # type: ignore[import-untyped] from colorama import Fore, Style from .activation import import_aiohttp_cookies @@ -68,8 +67,10 @@ class Submitter: else: cookie_jar = import_aiohttp_cookies(args.cookie_file) - connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=False) - connector.verify_ssl = False + ssl_context = __import__('ssl').create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = __import__('ssl').CERT_NONE + connector = ProxyConnector.from_url(proxy) if proxy else TCPConnector(ssl=ssl_context) self.session = ClientSession( connector=connector, trust_env=True, cookie_jar=cookie_jar ) @@ -88,7 +89,9 @@ class Submitter: alexa_rank = 0 try: - alexa_rank = int(root.find('.//REACH').attrib['RANK']) + reach_elem = root.find('.//REACH') + if reach_elem is not None: + alexa_rank = int(reach_elem.attrib['RANK']) except Exception: pass @@ -127,7 +130,7 @@ class Submitter: async def detect_known_engine( self, url_exists, url_mainpage, session, follow_redirects, headers - ) -> [List[MaigretSite], str]: + ) -> Tuple[List[MaigretSite], str]: session = session or self.session resp_text, _ = await self.get_html_response_to_compare( @@ -191,8 +194,9 @@ class Submitter: # TODO: replace with checking.py/SimpleAiohttpChecker call @staticmethod async def get_html_response_to_compare( - url: str, session: ClientSession = None, redirects=False, headers: Dict = None + url: str, session: Optional[ClientSession] = None, redirects=False, headers: Optional[Dict] = None ): + assert session is not None, "session must not be None" async with session.get( url, allow_redirects=redirects, headers=headers ) as response: @@ -211,10 +215,10 @@ class Submitter: username: str, url_exists: str, cookie_filename="", # TODO: use cookies - session: ClientSession = None, + session: Optional[ClientSession] = None, follow_redirects=False, - headers: dict = None, - ) -> Tuple[List[str], List[str], str, str]: + headers: Optional[dict] = None, + ) -> Tuple[Optional[List[str]], Optional[List[str]], str, str]: random_username = generate_random_username() url_of_non_existing_account = url_exists.lower().replace( @@ -269,11 +273,8 @@ class Submitter: tokens_a = set(re.split(f'[{self.SEPARATORS}]', first_html_response)) tokens_b = set(re.split(f'[{self.SEPARATORS}]', second_html_response)) - a_minus_b = tokens_a.difference(tokens_b) - b_minus_a = tokens_b.difference(tokens_a) - - a_minus_b = list(map(lambda x: x.strip('\\'), a_minus_b)) - b_minus_a = list(map(lambda x: x.strip('\\'), b_minus_a)) + a_minus_b: List[str] = [x.strip('\\') for x in tokens_a.difference(tokens_b)] + b_minus_a: List[str] = [x.strip('\\') for x in tokens_b.difference(tokens_a)] # Filter out strings containing usernames a_minus_b = [s for s in a_minus_b if username.lower() not in s.lower()] @@ -378,7 +379,7 @@ class Submitter: ).strip() if field in ['tags', 'presense_strs', 'absence_strs']: - new_value = list(map(str.strip, new_value.split(','))) + new_value = list(map(str.strip, new_value.split(','))) # type: ignore[assignment] if new_value: setattr(site, field, new_value) @@ -424,12 +425,12 @@ class Submitter: f"{Fore.YELLOW}[!] Sites with domain \"{domain_raw}\" already exists in the Maigret database!{Style.RESET_ALL}" ) - status = lambda s: "(disabled)" if s.disabled else "" + site_status = lambda s: "(disabled)" if s.disabled else "" url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}" print( "\n".join( [ - f"{site.name} {status(site)}{url_block(site)}" + f"{site.name} {site_status(site)}{url_block(site)}" for site in matched_sites ] ) @@ -497,7 +498,7 @@ class Submitter: ) print('Detecting site engine, please wait...') - sites = [] + sites: List[MaigretSite] = [] text = None try: sites, text = await self.detect_known_engine( @@ -510,7 +511,7 @@ class Submitter: except KeyboardInterrupt: print('Engine detect process is interrupted.') - if 'cloudflare' in text.lower(): + if text and 'cloudflare' in text.lower(): print( 'Cloudflare protection detected. I will use cloudscraper for further work' ) @@ -573,6 +574,8 @@ class Submitter: found = True break + assert chosen_site is not None, "No sites to check" + if not found: print( f"{Fore.RED}[!] The check for site '{chosen_site.name}' failed!{Style.RESET_ALL}" @@ -631,8 +634,8 @@ class Submitter: # chosen_site.alexa_rank = rank self.logger.info(chosen_site.json) - site_data = chosen_site.strip_engine_data() - self.logger.info(site_data.json) + stripped_site = chosen_site.strip_engine_data() + self.logger.info(stripped_site.json) if old_site: # Update old site with new values and log changes @@ -651,7 +654,7 @@ class Submitter: for field, display_name in fields_to_check.items(): old_value = getattr(old_site, field) - new_value = getattr(site_data, field) + new_value = getattr(stripped_site, field) if field == 'tags' and not new_tags: continue if str(old_value) != str(new_value): @@ -661,7 +664,7 @@ class Submitter: old_site.__dict__[field] = new_value # update the site - final_site = old_site if old_site else site_data + final_site = old_site if old_site else stripped_site self.db.update_site(final_site) # save the db in file diff --git a/maigret/utils.py b/maigret/utils.py index 4cb326c..0e38306 100644 --- a/maigret/utils.py +++ b/maigret/utils.py @@ -86,7 +86,7 @@ def get_dict_ascii_tree(items, prepend="", new_line=True): new_result + new_line if num != len(items) - 1 else last_result + new_line ) - if type(item) == tuple: + if isinstance(item, tuple): field_name, field_value = item if field_value.startswith("['"): is_last_item = num == len(items) - 1 diff --git a/maigret/web/app.py b/maigret/web/app.py index a1a8857..522cec9 100644 --- a/maigret/web/app.py +++ b/maigret/web/app.py @@ -13,6 +13,7 @@ import os import asyncio from datetime import datetime from threading import Thread +from typing import Any, Dict import maigret import maigret.settings from maigret.sites import MaigretDatabase @@ -23,7 +24,7 @@ app = Flask(__name__) app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24).hex()) # add background job tracking -background_jobs = {} +background_jobs: Dict[str, Any] = {} job_results = {} # Configuration @@ -260,7 +261,7 @@ def search(): target=process_search_task, args=(usernames, options, timestamp) ), } - background_jobs[timestamp]['thread'].start() + background_jobs[timestamp]['thread'].start() # type: ignore[union-attr] return redirect(url_for('status', timestamp=timestamp)) diff --git a/pyproject.toml b/pyproject.toml index 91829e5..a13e271 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,4 +95,4 @@ black = ">=25.1,<27.0" [tool.poetry.scripts] # Run with: poetry run maigret maigret = "maigret.maigret:run" -update_sitesmd = "utils.update_site_data:main" \ No newline at end of file +update_sitesmd = "utils.update_site_data:main" diff --git a/tests/test_errors.py b/tests/test_errors.py index e44bf7d..caca3ac 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -36,7 +36,7 @@ def test_notify_about_errors(): }, } - results = notify_about_errors(results, query_notify=None, show_statistics=True) + notifications = notify_about_errors(results, query_notify=None, show_statistics=True) # Check the output expected_output = [ @@ -55,4 +55,4 @@ def test_notify_about_errors(): ('Access denied: 25.0%', '!'), ('You can see detailed site check errors with a flag `--print-errors`', '-'), ] - assert results == expected_output + assert notifications == expected_output diff --git a/tests/test_executors.py b/tests/test_executors.py index 7a39897..7d96e83 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -3,6 +3,7 @@ import pytest import asyncio import logging +from typing import Any, List, Tuple, Callable, Dict from maigret.executors import ( AsyncioSimpleExecutor, AsyncioProgressbarExecutor, @@ -21,7 +22,7 @@ async def func(n): @pytest.mark.asyncio async def test_simple_asyncio_executor(): - tasks = [(func, [n], {}) for n in range(10)] + tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)] executor = AsyncioSimpleExecutor(logger=logger) assert await executor.run(tasks) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] assert executor.execution_time > 0.2 @@ -30,7 +31,7 @@ async def test_simple_asyncio_executor(): @pytest.mark.asyncio async def test_asyncio_progressbar_executor(): - tasks = [(func, [n], {}) for n in range(10)] + tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)] executor = AsyncioProgressbarExecutor(logger=logger) # no guarantees for the results order @@ -41,7 +42,7 @@ async def test_asyncio_progressbar_executor(): @pytest.mark.asyncio async def test_asyncio_progressbar_semaphore_executor(): - tasks = [(func, [n], {}) for n in range(10)] + tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)] executor = AsyncioProgressbarSemaphoreExecutor(logger=logger, in_parallel=5) # no guarantees for the results order @@ -53,7 +54,7 @@ async def test_asyncio_progressbar_semaphore_executor(): @pytest.mark.slow @pytest.mark.asyncio async def test_asyncio_progressbar_queue_executor(): - tasks = [(func, [n], {}) for n in range(10)] + tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)] executor = AsyncioProgressbarQueueExecutor(logger=logger, in_parallel=2) assert await executor.run(tasks) == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8] @@ -81,22 +82,22 @@ async def test_asyncio_progressbar_queue_executor(): @pytest.mark.asyncio async def test_asyncio_queue_generator_executor(): - tasks = [(func, [n], {}) for n in range(10)] + tasks: List[Tuple[Callable, list, dict]] = [(func, [n], {}) for n in range(10)] executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2) - results = [result async for result in executor.run(tasks)] + results = [result async for result in executor.run(tasks)] # type: ignore[arg-type] assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8] assert executor.execution_time > 0.5 assert executor.execution_time < 0.6 executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3) - results = [result async for result in executor.run(tasks)] + results = [result async for result in executor.run(tasks)] # type: ignore[arg-type] assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8] assert executor.execution_time > 0.4 assert executor.execution_time < 0.5 executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5) - results = [result async for result in executor.run(tasks)] + results = [result async for result in executor.run(tasks)] # type: ignore[arg-type] assert results in ( [0, 3, 6, 1, 4, 7, 9, 2, 5, 8], [0, 3, 6, 1, 4, 9, 7, 2, 5, 8], @@ -105,7 +106,7 @@ async def test_asyncio_queue_generator_executor(): assert executor.execution_time < 0.4 executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10) - results = [result async for result in executor.run(tasks)] + results = [result async for result in executor.run(tasks)] # type: ignore[arg-type] assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] assert executor.execution_time > 0.2 assert executor.execution_time < 0.3 diff --git a/tests/test_maigret.py b/tests/test_maigret.py index 5cd5172..87a3c46 100644 --- a/tests/test_maigret.py +++ b/tests/test_maigret.py @@ -158,7 +158,7 @@ def test_extract_ids_from_page(test_db): def test_extract_ids_from_results(test_db): - TEST_EXAMPLE = copy.deepcopy(RESULTS_EXAMPLE) + TEST_EXAMPLE: dict = copy.deepcopy(RESULTS_EXAMPLE) TEST_EXAMPLE['Reddit']['ids_usernames'] = {'test1': 'yandex_public_id'} TEST_EXAMPLE['Reddit']['ids_links'] = ['https://www.reddit.com/user/test2'] diff --git a/tests/test_report.py b/tests/test_report.py index 47c00a0..e5fb88b 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -6,7 +6,7 @@ import os import pytest from io import StringIO -import xmind +import xmind # type: ignore[import-untyped] from jinja2 import Template from maigret.report import ( diff --git a/tests/test_sites.py b/tests/test_sites.py index 6cbf680..464b1cd 100644 --- a/tests/test_sites.py +++ b/tests/test_sites.py @@ -1,8 +1,10 @@ """Maigret Database test functions""" +from typing import Any, Dict + from maigret.sites import MaigretDatabase, MaigretSite -EXAMPLE_DB = { +EXAMPLE_DB: Dict[str, Any] = { 'engines': { "XenForo": { "presenseStrs": ["XenForo"], diff --git a/tests/test_submit.py b/tests/test_submit.py index d957d82..97a9895 100644 --- a/tests/test_submit.py +++ b/tests/test_submit.py @@ -28,7 +28,7 @@ async def test_detect_known_engine(test_db, local_test_db): url_exists = "https://devforum.zoom.us/u/adam" url_mainpage = "https://devforum.zoom.us/" # Mock extract_username_dialog to return "adam" - submitter.extract_username_dialog = MagicMock(return_value="adam") + submitter.extract_username_dialog = MagicMock(return_value="adam") # type: ignore[method-assign] sites, resp_text = await submitter.detect_known_engine( url_exists, url_mainpage, session=None, follow_redirects=False, headers=None @@ -111,7 +111,7 @@ async def test_check_features_manually_success(settings): @pytest.mark.slow @pytest.mark.asyncio -async def test_check_features_manually_success(settings): +async def test_check_features_manually_cloudflare(settings): # Setup db = MaigretDatabase() logger = logging.getLogger("test_logger")