Improved "submit new site" mode, added tests, fixed top-500 sites (#1952)

This commit is contained in:
Soxoj
2024-12-10 18:02:43 +01:00
committed by GitHub
parent 51ab988e36
commit 81a817a39f
12 changed files with 691 additions and 194 deletions
+262 -121
View File
@@ -2,7 +2,8 @@ import asyncio
import json
import re
import os
from typing import Any, Dict, List, Optional
import logging
from typing import Any, Dict, List, Optional, Tuple
from aiohttp import ClientSession, TCPConnector
from aiohttp_socks import ProxyConnector
@@ -15,7 +16,7 @@ from .settings import Settings
from .sites import MaigretDatabase, MaigretEngine, MaigretSite
from .utils import get_random_user_agent
from .checking import site_self_check
from .utils import get_match_ratio
from .utils import get_match_ratio, generate_random_username
class CloudflareSession:
@@ -125,21 +126,13 @@ class Submitter:
return fields
async def detect_known_engine(
self, url_exists, url_mainpage
self, url_exists, url_mainpage, session, follow_redirects, headers
) -> [List[MaigretSite], str]:
resp_text = ''
try:
r = await self.session.get(url_mainpage)
content = await r.content.read()
charset = r.charset or "utf-8"
resp_text = content.decode(charset, "ignore")
self.logger.debug(resp_text)
except Exception as e:
self.logger.warning(e, exc_info=True)
print(f"Some error while checking main page: {e}")
return [], resp_text
session = session or self.session
resp_text, _ = await self.get_html_response_to_compare(
url_exists, session, follow_redirects, headers
)
for engine in self.db.engines:
strs_to_check = engine.__dict__.get("presenseStrs")
@@ -195,113 +188,134 @@ class Submitter:
)
return entered_username if entered_username else supposed_username
async def check_features_manually(
self, url_exists, url_mainpage, cookie_file, redirects=False
@staticmethod
async def get_html_response_to_compare(
url: str, session: ClientSession = None, redirects=False, headers: Dict = None
):
custom_headers = {}
while self.args.verbose:
header_key = input(
'Specify custom header if you need or just press Enter to skip. Header name: '
async with session.get(
url, allow_redirects=redirects, headers=headers
) as response:
# Try different encodings or fallback to 'ignore' errors
try:
html_response = await response.text(encoding='utf-8')
except UnicodeDecodeError:
try:
html_response = await response.text(encoding='latin1')
except UnicodeDecodeError:
html_response = await response.text(errors='ignore')
return html_response, response.status
async def check_features_manually(
self,
username: str,
url_exists: str,
cookie_filename="", # TODO: use cookies
session: ClientSession = None,
follow_redirects=False,
headers: dict = None,
) -> Tuple[List[str], List[str], str, str]:
random_username = generate_random_username()
url_of_non_existing_account = url_exists.lower().replace(
username.lower(), random_username
)
try:
session = session or self.session
first_html_response, first_status = await self.get_html_response_to_compare(
url_exists, session, follow_redirects, headers
)
if not header_key:
break
header_value = input('Header value: ')
custom_headers[header_key.strip()] = header_value.strip()
second_html_response, second_status = (
await self.get_html_response_to_compare(
url_of_non_existing_account, session, follow_redirects, headers
)
)
await session.close()
except Exception as e:
self.logger.error(
f"Error while getting HTTP response for username {username}: {e}",
exc_info=True,
)
return None, None, str(e), random_username
supposed_username = self.extract_username_dialog(url_exists)
non_exist_username = "noonewouldeverusethis7"
url_user = url_exists.replace(supposed_username, "{username}")
url_not_exists = url_exists.replace(supposed_username, non_exist_username)
headers = dict(self.HEADERS)
headers.update(custom_headers)
exists_resp = await self.session.get(
url_exists,
headers=headers,
allow_redirects=redirects,
self.logger.info(f"URL with existing account: {url_exists}")
self.logger.info(
f"HTTP response status for URL with existing account: {first_status}"
)
exists_resp_text = await exists_resp.text()
self.logger.debug(url_exists)
self.logger.debug(exists_resp.status)
self.logger.debug(exists_resp_text)
non_exists_resp = await self.session.get(
url_not_exists,
headers=headers,
allow_redirects=redirects,
self.logger.info(
f"HTTP response length URL with existing account: {len(first_html_response)}"
)
non_exists_resp_text = await non_exists_resp.text()
self.logger.debug(url_not_exists)
self.logger.debug(non_exists_resp.status)
self.logger.debug(non_exists_resp_text)
self.logger.debug(first_html_response)
a = exists_resp_text
b = non_exists_resp_text
self.logger.info(f"URL with existing account: {url_of_non_existing_account}")
self.logger.info(
f"HTTP response status for URL with non-existing account: {second_status}"
)
self.logger.info(
f"HTTP response length URL with non-existing account: {len(second_html_response)}"
)
self.logger.debug(second_html_response)
tokens_a = set(re.split(f'[{self.SEPARATORS}]', a))
tokens_b = set(re.split(f'[{self.SEPARATORS}]', b))
# TODO: filter by errors, move to dialog function
if (
"/cdn-cgi/challenge-platform" in first_html_response
or "\t\t\t\tnow: " in first_html_response
or "Sorry, you have been blocked" in first_html_response
):
self.logger.info("Cloudflare detected, skipping")
return None, None, "Cloudflare detected, skipping", random_username
tokens_a = set(re.split(f'[{self.SEPARATORS}]', first_html_response))
tokens_b = set(re.split(f'[{self.SEPARATORS}]', second_html_response))
a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a)
# additional filtering by html response
a_minus_b = [t for t in a_minus_b if t not in non_exists_resp_text]
b_minus_a = [t for t in b_minus_a if t not in exists_resp_text]
a_minus_b = list(map(lambda x: x.strip('\\'), a_minus_b))
b_minus_a = list(map(lambda x: x.strip('\\'), b_minus_a))
# Filter out strings containing usernames
a_minus_b = [s for s in a_minus_b if username.lower() not in s.lower()]
b_minus_a = [s for s in b_minus_a if random_username.lower() not in s.lower()]
def filter_tokens(token: str, html_response: str) -> bool:
is_in_html = token in html_response
is_long_str = len(token) >= 50
is_number = re.match(r'^\d\.?\d+$', token) or re.match(r':^\d+$', token)
is_whitelisted_number = token in ['200', '404', '403']
return not (
is_in_html or is_long_str or (is_number and not is_whitelisted_number)
)
a_minus_b = list(
filter(lambda t: filter_tokens(t, second_html_response), a_minus_b)
)
b_minus_a = list(
filter(lambda t: filter_tokens(t, first_html_response), b_minus_a)
)
if len(a_minus_b) == len(b_minus_a) == 0:
print("The pages for existing and non-existing account are the same!")
top_features_count = int(
input(
f"Specify count of features to extract [default {self.TOP_FEATURES}]: "
return (
None,
None,
"HTTP responses for pages with existing and non-existing accounts are the same",
random_username,
)
or self.TOP_FEATURES
)
match_fun = get_match_ratio(self.settings.presence_strings)
presence_list = sorted(a_minus_b, key=match_fun, reverse=True)[
:top_features_count
: self.TOP_FEATURES
]
self.logger.debug([(keyword, match_fun(keyword)) for keyword in presence_list])
print("Detected text features of existing account: " + ", ".join(presence_list))
features = input("If features was not detected correctly, write it manually: ")
if features:
presence_list = list(map(str.strip, features.split(",")))
absence_list = sorted(b_minus_a, key=match_fun, reverse=True)[
:top_features_count
: self.TOP_FEATURES
]
self.logger.debug([(keyword, match_fun(keyword)) for keyword in absence_list])
print(
"Detected text features of non-existing account: " + ", ".join(absence_list)
)
features = input("If features was not detected correctly, write it manually: ")
self.logger.info(f"Detected presence features: {presence_list}")
self.logger.info(f"Detected absence features: {absence_list}")
if features:
absence_list = list(map(str.strip, features.split(",")))
site_data = {
"absenceStrs": absence_list,
"presenseStrs": presence_list,
"url": url_user,
"urlMain": url_mainpage,
"usernameClaimed": supposed_username,
"usernameUnclaimed": non_exist_username,
"checkType": "message",
}
if headers != self.HEADERS:
site_data['headers'] = headers
site = MaigretSite(url_mainpage.split("/")[-1], site_data)
return site
return presence_list, absence_list, "Found", random_username
async def add_site(self, site):
sem = asyncio.Semaphore(1)
@@ -376,6 +390,12 @@ class Submitter:
}
async def dialog(self, url_exists, cookie_file):
old_site = None
additional_options_enabled = self.logger.level in (
logging.DEBUG,
logging.WARNING,
)
domain_raw = self.URL_RE.sub("", url_exists).strip().strip("/")
domain_raw = domain_raw.split("/")[0]
self.logger.info('Domain is %s', domain_raw)
@@ -386,9 +406,11 @@ class Submitter:
)
if matched_sites:
# TODO: update the existing site
print(
f'Sites with domain "{domain_raw}" already exists in the Maigret database!'
f"{Fore.YELLOW}[!] Sites with domain \"{domain_raw}\" already exists in the Maigret database!{Style.RESET_ALL}"
)
status = lambda s: "(disabled)" if s.disabled else ""
url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}"
print(
@@ -400,16 +422,62 @@ class Submitter:
)
)
if input("Do you want to continue? [yN] ").lower() in "n":
if (
input(
f"{Fore.GREEN}[?] Do you want to continue? [yN] {Style.RESET_ALL}"
).lower()
in "n"
):
return False
site_names = [site.name for site in matched_sites]
site_name = (
input(
f"{Fore.GREEN}[?] Which site do you want to update in case of success? 1st by default. [{', '.join(site_names)}] {Style.RESET_ALL}"
)
or matched_sites[0].name
)
old_site = next(
(site for site in matched_sites if site.name == site_name), None
)
print(
f'{Fore.GREEN}[+] We will update site "{old_site.name}" in case of success.{Style.RESET_ALL}'
)
url_mainpage = self.extract_mainpage_url(url_exists)
# headers update
custom_headers = dict(self.HEADERS)
while additional_options_enabled:
header_key = input(
f'{Fore.GREEN}[?] Specify custom header if you need or just press Enter to skip. Header name: {Style.RESET_ALL}'
)
if not header_key:
break
header_value = input(f'{Fore.GREEN}[?] Header value: {Style.RESET_ALL}')
custom_headers[header_key.strip()] = header_value.strip()
# redirects settings update
redirects = False
if additional_options_enabled:
redirects = (
'y'
in input(
f'{Fore.GREEN}[?] Should we do redirects automatically? [yN] {Style.RESET_ALL}'
).lower()
)
print('Detecting site engine, please wait...')
sites = []
text = None
try:
sites, text = await self.detect_known_engine(url_exists, url_exists)
sites, text = await self.detect_known_engine(
url_exists,
url_exists,
session=None,
follow_redirects=redirects,
headers=custom_headers,
)
except KeyboardInterrupt:
print('Engine detect process is interrupted.')
@@ -422,26 +490,48 @@ class Submitter:
if not sites:
print("Unable to detect site engine, lets generate checking features")
redirects = False
if self.args.verbose:
redirects = (
'y' in input('Should we do redirects automatically? [yN] ').lower()
)
supposed_username = self.extract_username_dialog(url_exists)
self.logger.info(f"Supposed username: {supposed_username}")
sites = [
presence_list, absence_list, status, non_exist_username = (
await self.check_features_manually(
url_exists,
url_mainpage,
cookie_file,
redirects,
username=supposed_username,
url_exists=url_exists,
cookie_filename=cookie_file,
follow_redirects=redirects,
headers=custom_headers,
)
]
)
if status == "Found":
site_data = {
"absenceStrs": absence_list,
"presenseStrs": presence_list,
"url": url_exists.replace(supposed_username, '{username}'),
"urlMain": url_mainpage,
"usernameClaimed": supposed_username,
"usernameUnclaimed": non_exist_username,
"checkType": "message",
}
self.logger.info(json.dumps(site_data, indent=4))
if custom_headers != self.HEADERS:
site_data['headers'] = custom_headers
site = MaigretSite(url_mainpage.split("/")[-1], site_data)
sites.append(site)
else:
print(
f"{Fore.RED}[!] The check for site failed! Reason: {status}{Style.RESET_ALL}"
)
return False
self.logger.debug(sites[0].__dict__)
sem = asyncio.Semaphore(1)
print("Checking, please wait...")
print(f"{Fore.GREEN}[*] Checking, please wait...{Style.RESET_ALL}")
found = False
chosen_site = None
for s in sites:
@@ -463,7 +553,7 @@ class Submitter:
else:
if (
input(
f"Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] "
f"{Fore.GREEN}[?] Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] {Style.RESET_ALL}"
)
.lower()
.strip("y")
@@ -471,22 +561,73 @@ class Submitter:
return False
if self.args.verbose:
source = input("Name the source site if it is mirror: ")
self.logger.info(
"Verbose mode is enabled, additional settings are available"
)
source = input(
f"{Fore.GREEN}[?] Name the source site if it is mirror: {Style.RESET_ALL}"
)
if source:
chosen_site.source = source
chosen_site.name = input("Change site name if you want: ") or chosen_site.name
chosen_site.tags = list(map(str.strip, input("Site tags: ").split(',')))
default_site_name = old_site.name if old_site else chosen_site.name
new_name = (
input(
f"{Fore.GREEN}[?] Change site name if you want [{default_site_name}]: {Style.RESET_ALL}"
)
or default_site_name
)
if new_name != default_site_name:
self.logger.info(f"New site name is {new_name}")
chosen_site.name = new_name
# TODO: remove empty tags
new_tags = input(f"{Fore.GREEN}[?] Site tags: {Style.RESET_ALL}")
if new_tags:
chosen_site.tags = list(map(str.strip, new_tags.split(',')))
else:
chosen_site.tags = []
self.logger.info(f"Site tags are: {', '.join(chosen_site.tags)}")
# rank = Submitter.get_alexa_rank(chosen_site.url_main)
# if rank:
# print(f'New alexa rank: {rank}')
# chosen_site.alexa_rank = rank
self.logger.debug(chosen_site.json)
self.logger.info(chosen_site.json)
site_data = chosen_site.strip_engine_data()
self.logger.debug(site_data.json)
self.db.update_site(site_data)
self.logger.info(site_data.json)
if old_site:
# Update old site with new values and log changes
fields_to_check = {
'url': 'URL',
'url_main': 'Main URL',
'username_claimed': 'Username claimed',
'username_unclaimed': 'Username unclaimed',
'check_type': 'Check type',
'presense_strs': 'Presence strings',
'absence_strs': 'Absence strings',
'tags': 'Tags',
'source': 'Source',
'headers': 'Headers',
}
for field, display_name in fields_to_check.items():
old_value = getattr(old_site, field)
new_value = getattr(site_data, field)
if field == 'tags' and not new_tags:
continue
if str(old_value) != str(new_value):
print(
f"{Fore.YELLOW}[*] '{display_name}' updated: {Fore.RED}{old_value} {Fore.YELLOW}to {Fore.GREEN}{new_value}{Style.RESET_ALL}"
)
old_site.__dict__[field] = new_value
# update the site
final_site = old_site if old_site else site_data
self.db.update_site(final_site)
# save the db in file
if self.args.db_file != self.settings.sites_db_path:
print(
f"{Fore.GREEN}[+] Maigret DB is saved to {self.args.db}.{Style.RESET_ALL}"