Refactored sites module, updated documentation (#1918)

This commit is contained in:
Soxoj
2024-12-01 11:41:41 +01:00
committed by GitHub
parent 5073ceff13
commit 2f93963a0a
8 changed files with 190 additions and 85 deletions
+1 -2
View File
@@ -11,7 +11,6 @@ from urllib.parse import quote
# Third party imports
import aiodns
import alive_progress
from alive_progress import alive_bar
from aiohttp import ClientSession, TCPConnector, http_exceptions
from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError
@@ -127,7 +126,7 @@ class SimpleAiohttpChecker(CheckerBase):
async with ClientSession(
connector=connector,
trust_env=True,
cookie_jar=self.cookie_jar.copy() if self.cookie_jar else None
cookie_jar=self.cookie_jar.copy() if self.cookie_jar else None,
) as session:
html_text, status_code, error = await self._make_request(
session,
+70 -54
View File
@@ -21,6 +21,7 @@ class MaigretEngine:
class MaigretSite:
# Fields that should not be serialized when converting site to JSON
NOT_SERIALIZABLE_FIELDS = [
"name",
"engineData",
@@ -31,37 +32,65 @@ class MaigretSite:
"urlRegexp",
]
# Username known to exist on the site
username_claimed = ""
# Username known to not exist on the site
username_unclaimed = ""
# Additional URL path component, e.g. /forum in https://example.com/forum/users/{username}
url_subpath = ""
# Main site URL (the main page)
url_main = ""
# Full URL pattern for username page, e.g. https://example.com/forum/users/{username}
url = ""
# Whether site is disabled. Not used by Maigret without --use-disabled argument
disabled = False
# Whether a positive result indicates accounts with similar usernames rather than exact matches
similar_search = False
# Whether to ignore 403 status codes
ignore403 = False
# Site category tags
tags: List[str] = []
# Type of identifier (username, gaia_id etc); see SUPPORTED_IDS in checking.py
type = "username"
# Custom HTTP headers
headers: Dict[str, str] = {}
# Error message substrings
errors: Dict[str, str] = {}
# Site activation requirements
activation: Dict[str, Any] = {}
# Regular expression for username validation
regex_check = None
# URL to probe site status
url_probe = None
# Type of check to perform
check_type = ""
# Whether to only send HEAD requests (GET by default)
request_head_only = ""
# GET parameters to include in requests
get_params: Dict[str, Any] = {}
# Substrings in HTML response that indicate profile exists
presense_strs: List[str] = []
# Substrings in HTML response that indicate profile doesn't exist
absence_strs: List[str] = []
# Site statistics
stats: Dict[str, Any] = {}
# Site engine name
engine = None
# Engine-specific configuration
engine_data: Dict[str, Any] = {}
# Engine instance
engine_obj: Optional["MaigretEngine"] = None
# Future for async requests
request_future = None
# Alexa traffic rank
alexa_rank = None
# Source (in case a site is a mirror of another site)
source = None
# URL protocol (http/https)
protocol = ''
def __init__(self, name, information):
@@ -96,20 +125,21 @@ class MaigretSite:
def __eq__(self, other):
if isinstance(other, MaigretSite):
# Compare only relevant attributes, not internal state like request_future
attrs_to_compare = ['name', 'url_main', 'url_subpath', 'type', 'headers',
'errors', 'activation', 'regex_check', 'url_probe',
'check_type', 'request_head_only', 'get_params',
'presense_strs', 'absence_strs', 'stats', 'engine',
'engine_data', 'alexa_rank', 'source', 'protocol']
attrs_to_compare = [
'name', 'url_main', 'url_subpath', 'type', 'headers',
'errors', 'activation', 'regex_check', 'url_probe',
'check_type', 'request_head_only', 'get_params',
'presense_strs', 'absence_strs', 'stats', 'engine',
'engine_data', 'alexa_rank', 'source', 'protocol'
]
return all(getattr(self, attr) == getattr(other, attr)
for attr in attrs_to_compare)
for attr in attrs_to_compare)
elif isinstance(other, str):
# Compare only by name (exactly) or url_main (partial similarity)
return self.__is_equal_by_url_or_name(other)
return False
def update_detectors(self):
if "url" in self.__dict__:
url = self.url
@@ -474,78 +504,64 @@ class MaigretDatabase:
return results
def get_db_stats(self, is_markdown=False):
# Initialize counters
sites_dict = self.sites_dict
urls = {}
tags = {}
output = ""
disabled_count = 0
total_count = len(sites_dict)
message_checks = 0
message_checks_one_factor = 0
status_checks = 0
for _, site in sites_dict.items():
# Collect statistics
for site in sites_dict.values():
# Count disabled sites
if site.disabled:
disabled_count += 1
# Count URL types
url_type = site.get_url_template()
urls[url_type] = urls.get(url_type, 0) + 1
if site.check_type == 'message' and not site.disabled:
message_checks += 1
if site.absence_strs and site.presense_strs:
continue
message_checks_one_factor += 1
if site.check_type == 'status_code':
status_checks += 1
# Count check types for enabled sites
if not site.disabled:
if site.check_type == 'message':
if not (site.absence_strs and site.presense_strs):
message_checks_one_factor += 1
elif site.check_type == 'status_code':
status_checks += 1
# Count tags
if not site.tags:
tags["NO_TAGS"] = tags.get("NO_TAGS", 0) + 1
for tag in filter(lambda x: not is_country_tag(x), site.tags):
tags[tag] = tags.get(tag, 0) + 1
# Calculate percentages
total_count = len(sites_dict)
enabled_count = total_count - disabled_count
enabled_perc = round(100 * enabled_count / total_count, 2)
output += (
f"Enabled/total sites: {enabled_count}/{total_count} = {enabled_perc}%\n\n"
)
checks_perc = round(100 * message_checks_one_factor / enabled_count, 2)
output += f"Incomplete message checks: {message_checks_one_factor}/{enabled_count} = {checks_perc}% (false positive risks)\n\n"
status_checks_perc = round(100 * status_checks / enabled_count, 2)
output += f"Status code checks: {status_checks}/{enabled_count} = {status_checks_perc}% (false positive risks)\n\n"
output += (
f"False positive risk (total): {checks_perc+status_checks_perc:.2f}%\n\n"
)
# Format output
separator = "\n\n"
output = [
f"Enabled/total sites: {enabled_count}/{total_count} = {enabled_perc}%",
f"Incomplete message checks: {message_checks_one_factor}/{enabled_count} = {checks_perc}% (false positive risks)",
f"Status code checks: {status_checks}/{enabled_count} = {status_checks_perc}% (false positive risks)",
f"False positive risk (total): {checks_perc + status_checks_perc:.2f}%",
self._format_top_items("profile URLs", urls, 20, is_markdown),
self._format_top_items("tags", tags, 20, is_markdown, self._tags),
]
top_urls_count = 20
output += f"Top {top_urls_count} profile URLs:\n"
for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[
:top_urls_count
]:
return separator.join(output)
def _format_top_items(self, title, items_dict, limit, is_markdown, valid_items=None):
"""Helper method to format top items lists"""
output = f"Top {limit} {title}:\n"
for item, count in sorted(items_dict.items(), key=lambda x: x[1], reverse=True)[:limit]:
if count == 1:
break
output += f"- ({count})\t`{url}`\n" if is_markdown else f"{count}\t{url}\n"
top_tags_count = 20
output += f"\nTop {top_tags_count} tags:\n"
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True)[
:top_tags_count
]:
mark = ""
if tag not in self._tags:
mark = " (non-standard)"
output += (
f"- ({count})\t`{tag}`{mark}\n"
if is_markdown
else f"{count}\t{tag}{mark}\n"
)
mark = " (non-standard)" if valid_items is not None and item not in valid_items else ""
output += f"- ({count})\t`{item}`{mark}\n" if is_markdown else f"{count}\t{item}{mark}\n"
return output