Merge pull request #169 from soxoj/submit-mode-refactoring

Refactoring of submit module, some fixes
This commit is contained in:
soxoj
2021-06-13 00:45:37 +03:00
committed by GitHub
11 changed files with 534 additions and 438 deletions
+1 -1
View File
@@ -25,7 +25,7 @@ format:
pull:
git stash
git checkout main
git pull origin head
git pull origin main
git stash pop
clean:
+10 -4
View File
@@ -36,9 +36,10 @@ from .report import (
sort_report_by_data_points,
)
from .sites import MaigretDatabase
from .submit import submit_dialog
from .submit import Submitter
from .types import QueryResultWrapper
from .utils import get_dict_ascii_tree
from .settings import Settings
def notify_about_errors(search_results: QueryResultWrapper, query_notify):
@@ -496,6 +497,12 @@ async def main():
if args.tags:
args.tags = list(set(str(args.tags).split(',')))
settings = Settings(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), "resources/settings.json"
)
)
if args.db_file is None:
args.db_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "resources/data.json"
@@ -526,9 +533,8 @@ async def main():
site_data = get_top_sites_for_id(args.id_type)
if args.new_site_to_submit:
is_submitted = await submit_dialog(
db, args.new_site_to_submit, args.cookie_file, logger
)
submitter = Submitter(db=db, logger=logger, settings=settings)
is_submitted = await submitter.dialog(args.new_site_to_submit, args.cookie_file)
if is_submitted:
db.save_to_file(args.db_file)
+64 -5
View File
@@ -13036,7 +13036,7 @@
"us"
],
"headers": {
"authorization": "Bearer BQCypIuUtz7zDFov8xN86mj1BelLf7Apf9WBaC5yYfNkmGe4r7Hz4Awp6dqPuCAP9K9F5yYtjbyZX_vlr4I"
"authorization": "Bearer BQAkHoH1XLhjIl6oh6r9YzH3kHC1OZg3UXgLiz39FzqRFh_xQrFaVrZcU-esM-t87B6Hqdc4L1HBgukKnWE"
},
"errors": {
"Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn"
@@ -13990,7 +13990,8 @@
"us"
],
"errors": {
"Website unavailable": "Site error"
"Website unavailable": "Site error",
"is currently offline": "Site error"
},
"checkType": "message",
"absenceStrs": [
@@ -14462,7 +14463,7 @@
"sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"",
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"x-guest-token": "1400174453577900043"
"x-guest-token": "1403829602053771266"
},
"errors": {
"Bad guest token": "x-guest-token update required"
@@ -14869,7 +14870,7 @@
"video"
],
"headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MjI2NjcxMjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.V4VVbLzNwPU21rNP5moSxrPcPw--C7_Qz9VHgcJc1CA"
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MjM1MzQ5NjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.5T8_p_q9zXOHXI2FT_XtMhsZUJMtPgCIaqwVF2u4aZI"
},
"activation": {
"url": "https://vimeo.com/_rv/viewer",
@@ -28457,5 +28458,63 @@
]
}
}
}
},
"tags": [
"gaming",
"coding",
"photo",
"music",
"blog",
"finance",
"freelance",
"dating",
"tech",
"forum",
"porn",
"erotic",
"webcam",
"video",
"movies",
"hacking",
"art",
"discussion",
"sharing",
"writing",
"wiki",
"business",
"shopping",
"sport",
"books",
"news",
"documents",
"travel",
"maps",
"hobby",
"apps",
"classified",
"career",
"geosocial",
"streaming",
"education",
"networking",
"torrent",
"science",
"medicine",
"reading",
"stock",
"messaging",
"trading",
"links",
"fashion",
"tasks",
"military",
"auto",
"gambling",
"cybercriminal",
"review",
"bookmarks",
"design",
"tor",
"i2p"
]
}
+17
View File
@@ -0,0 +1,17 @@
{
"presence_strings": [
"username",
"not found",
"пользователь",
"profile",
"lastname",
"firstname",
"biography",
"birthday",
"репутация",
"информация",
"e-mail"
],
"supposed_usernames": [
"alex", "god", "admin", "red", "blue", "john"]
}
+29
View File
@@ -0,0 +1,29 @@
import json
class Settings:
presence_strings: list
supposed_usernames: list
def __init__(self, filename):
data = {}
try:
with open(filename, "r", encoding="utf-8") as file:
try:
data = json.load(file)
except Exception as error:
raise ValueError(
f"Problem with parsing json contents of "
f"settings file '{filename}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(
f"Problem while attempting to access settings file '{filename}'."
) from error
self.__dict__.update(data)
@property
def json(self):
return self.__dict__
+11 -66
View File
@@ -9,66 +9,6 @@ import requests
from .utils import CaseConverter, URLMatcher, is_country_tag
# TODO: move to data.json
SUPPORTED_TAGS = [
"gaming",
"coding",
"photo",
"music",
"blog",
"finance",
"freelance",
"dating",
"tech",
"forum",
"porn",
"erotic",
"webcam",
"video",
"movies",
"hacking",
"art",
"discussion",
"sharing",
"writing",
"wiki",
"business",
"shopping",
"sport",
"books",
"news",
"documents",
"travel",
"maps",
"hobby",
"apps",
"classified",
"career",
"geosocial",
"streaming",
"education",
"networking",
"torrent",
"science",
"medicine",
"reading",
"stock",
"messaging",
"trading",
"links",
"fashion",
"tasks",
"military",
"auto",
"gambling",
"cybercriminal",
"review",
"bookmarks",
"design",
"tor",
"i2p",
]
class MaigretEngine:
site: Dict[str, Any] = {}
@@ -204,12 +144,12 @@ class MaigretSite:
errors.update(self.errors)
return errors
def get_url_type(self) -> str:
def get_url_template(self) -> str:
url = URLMatcher.extract_main_part(self.url)
if url.startswith("{username}"):
url = "SUBDOMAIN"
elif url == "":
url = f"{self.url} ({self.engine})"
url = f"{self.url} ({self.engine or 'no engine'})"
else:
parts = url.split("/")
url = "/" + "/".join(parts[1:])
@@ -273,8 +213,9 @@ class MaigretSite:
class MaigretDatabase:
def __init__(self):
self._sites = []
self._engines = []
self._tags: list = []
self._sites: list = []
self._engines: list = []
@property
def sites(self):
@@ -354,6 +295,7 @@ class MaigretDatabase:
db_data = {
"sites": {site.name: site.strip_engine_data().json for site in self._sites},
"engines": {engine.name: engine.json for engine in self._engines},
"tags": self._tags,
}
json_data = json.dumps(db_data, indent=4)
@@ -367,6 +309,9 @@ class MaigretDatabase:
# Add all of site information from the json file to internal site list.
site_data = json_data.get("sites", {})
engines_data = json_data.get("engines", {})
tags = json_data.get("tags", [])
self._tags += tags
for engine_name in engines_data:
self._engines.append(MaigretEngine(engine_name, engines_data[engine_name]))
@@ -469,7 +414,7 @@ class MaigretDatabase:
if site.disabled:
disabled_count += 1
url_type = site.get_url_type()
url_type = site.get_url_template()
urls[url_type] = urls.get(url_type, 0) + 1
if not site.tags:
@@ -488,7 +433,7 @@ class MaigretDatabase:
output += "Top tags:\n"
for tag, count in sorted(tags.items(), key=lambda x: x[1], reverse=True)[:200]:
mark = ""
if tag not in SUPPORTED_TAGS:
if tag not in self._tags:
mark = " (non-standard)"
output += f"{count}\t{tag}{mark}\n"
+352 -360
View File
@@ -1,5 +1,4 @@
import asyncio
import difflib
import re
from typing import List
import xml.etree.ElementTree as ET
@@ -8,382 +7,375 @@ import requests
from .activation import import_aiohttp_cookies
from .checking import maigret
from .result import QueryStatus
from .settings import Settings
from .sites import MaigretDatabase, MaigretSite, MaigretEngine
from .utils import get_random_user_agent
from .utils import get_random_user_agent, get_match_ratio
DESIRED_STRINGS = [
"username",
"not found",
"пользователь",
"profile",
"lastname",
"firstname",
"biography",
"birthday",
"репутация",
"информация",
"e-mail",
]
SUPPOSED_USERNAMES = ["alex", "god", "admin", "red", "blue", "john"]
HEADERS = {
"User-Agent": get_random_user_agent(),
}
SEPARATORS = "\"'"
RATIO = 0.6
TOP_FEATURES = 5
URL_RE = re.compile(r"https?://(www\.)?")
def get_match_ratio(x):
return round(
max(
[difflib.SequenceMatcher(a=x.lower(), b=y).ratio() for y in DESIRED_STRINGS]
),
2,
)
def get_alexa_rank(site_url_main):
url = f"http://data.alexa.com/data?cli=10&url={site_url_main}"
xml_data = requests.get(url).text
root = ET.fromstring(xml_data)
alexa_rank = 0
try:
alexa_rank = int(root.find('.//REACH').attrib['RANK'])
except Exception:
pass
return alexa_rank
def extract_mainpage_url(url):
return "/".join(url.split("/", 3)[:3])
async def site_self_check(site, logger, semaphore, db: MaigretDatabase, silent=False):
changes = {
"disabled": False,
class Submitter:
HEADERS = {
"User-Agent": get_random_user_agent(),
}
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
SEPARATORS = "\"'"
logger.info(f"Checking {site.name}...")
RATIO = 0.6
TOP_FEATURES = 5
URL_RE = re.compile(r"https?://(www\.)?")
for username, status in check_data:
results_dict = await maigret(
username=username,
site_dict={site.name: site},
logger=logger,
timeout=30,
id_type=site.type,
forced=True,
no_progressbar=True,
)
def __init__(self, db: MaigretDatabase, settings: Settings, logger):
self.settings = settings
self.db = db
self.logger = logger
# don't disable entries with other ids types
# TODO: make normal checking
if site.name not in results_dict:
logger.info(results_dict)
changes["disabled"] = True
continue
@staticmethod
def get_alexa_rank(site_url_main):
url = f"http://data.alexa.com/data?cli=10&url={site_url_main}"
xml_data = requests.get(url).text
root = ET.fromstring(xml_data)
alexa_rank = 0
result = results_dict[site.name]["status"]
try:
alexa_rank = int(root.find('.//REACH').attrib['RANK'])
except Exception:
pass
site_status = result.status
return alexa_rank
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(
"Error while searching '%s' in %s: %s, %s, check type %s",
username,
site.name,
result.context,
msgs,
etype,
)
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes["disabled"] = True
elif status == QueryStatus.CLAIMED:
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
logger.info(results_dict[site.name])
changes["disabled"] = True
else:
logger.warning(f"Found `{username}` in {site.name}, must be available")
logger.info(results_dict[site.name])
changes["disabled"] = True
@staticmethod
def extract_mainpage_url(url):
return "/".join(url.split("/", 3)[:3])
logger.info(f"Site {site.name} checking is finished")
async def site_self_check(self, site, semaphore, silent=False):
changes = {
"disabled": False,
}
return changes
def generate_additional_fields_dialog(engine: MaigretEngine, dialog):
fields = {}
if 'urlSubpath' in engine.site.get('url', ''):
msg = (
'Detected engine suppose additional URL subpath using (/forum/, /blog/, etc). '
'Enter in manually if it exists: '
)
subpath = input(msg).strip('/')
if subpath:
fields['urlSubpath'] = f'/{subpath}'
return fields
async def detect_known_engine(
db, url_exists, url_mainpage, logger
) -> List[MaigretSite]:
try:
r = requests.get(url_mainpage)
logger.debug(r.text)
except Exception as e:
logger.warning(e)
print("Some error while checking main page")
return []
for engine in db.engines:
strs_to_check = engine.__dict__.get("presenseStrs")
if strs_to_check and r and r.text:
all_strs_in_response = True
for s in strs_to_check:
if s not in r.text:
all_strs_in_response = False
sites = []
if all_strs_in_response:
engine_name = engine.__dict__.get("name")
print(f"Detected engine {engine_name} for site {url_mainpage}")
usernames_to_check = SUPPOSED_USERNAMES
supposed_username = extract_username_dialog(url_exists)
if supposed_username:
usernames_to_check = [supposed_username] + usernames_to_check
add_fields = generate_additional_fields_dialog(engine, url_exists)
for u in usernames_to_check:
site_data = {
"urlMain": url_mainpage,
"name": url_mainpage.split("//")[1],
"engine": engine_name,
"usernameClaimed": u,
"usernameUnclaimed": "noonewouldeverusethis7",
**add_fields,
}
logger.info(site_data)
maigret_site = MaigretSite(url_mainpage.split("/")[-1], site_data)
maigret_site.update_from_engine(db.engines_dict[engine_name])
sites.append(maigret_site)
return sites
return []
def extract_username_dialog(url):
url_parts = url.rstrip("/").split("/")
supposed_username = url_parts[-1].strip('@')
entered_username = input(
f'Is "{supposed_username}" a valid username? If not, write it manually: '
)
return entered_username if entered_username else supposed_username
async def check_features_manually(
db, url_exists, url_mainpage, cookie_file, logger, redirects=False
):
custom_headers = {}
while True:
header_key = input(
'Specify custom header if you need or just press Enter to skip. Header name: '
)
if not header_key:
break
header_value = input('Header value: ')
custom_headers[header_key.strip()] = header_value.strip()
supposed_username = extract_username_dialog(url_exists)
non_exist_username = "noonewouldeverusethis7"
url_user = url_exists.replace(supposed_username, "{username}")
url_not_exists = url_exists.replace(supposed_username, non_exist_username)
headers = dict(HEADERS)
headers.update(custom_headers)
# cookies
cookie_dict = None
if cookie_file:
logger.info(f'Use {cookie_file} for cookies')
cookie_jar = import_aiohttp_cookies(cookie_file)
cookie_dict = {c.key: c.value for c in cookie_jar}
exists_resp = requests.get(
url_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
)
logger.debug(url_exists)
logger.debug(exists_resp.status_code)
logger.debug(exists_resp.text)
non_exists_resp = requests.get(
url_not_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
)
logger.debug(url_not_exists)
logger.debug(non_exists_resp.status_code)
logger.debug(non_exists_resp.text)
a = exists_resp.text
b = non_exists_resp.text
tokens_a = set(re.split(f'[{SEPARATORS}]', a))
tokens_b = set(re.split(f'[{SEPARATORS}]', b))
a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a)
if len(a_minus_b) == len(b_minus_a) == 0:
print("The pages for existing and non-existing account are the same!")
top_features_count = int(
input(f"Specify count of features to extract [default {TOP_FEATURES}]: ")
or TOP_FEATURES
)
presence_list = sorted(a_minus_b, key=get_match_ratio, reverse=True)[
:top_features_count
]
print("Detected text features of existing account: " + ", ".join(presence_list))
features = input("If features was not detected correctly, write it manually: ")
if features:
presence_list = list(map(str.strip, features.split(",")))
absence_list = sorted(b_minus_a, key=get_match_ratio, reverse=True)[
:top_features_count
]
print("Detected text features of non-existing account: " + ", ".join(absence_list))
features = input("If features was not detected correctly, write it manually: ")
if features:
absence_list = list(map(str.strip, features.split(",")))
site_data = {
"absenceStrs": absence_list,
"presenseStrs": presence_list,
"url": url_user,
"urlMain": url_mainpage,
"usernameClaimed": supposed_username,
"usernameUnclaimed": non_exist_username,
"checkType": "message",
}
if headers != HEADERS:
site_data['headers'] = headers
site = MaigretSite(url_mainpage.split("/")[-1], site_data)
return site
async def submit_dialog(db, url_exists, cookie_file, logger):
domain_raw = URL_RE.sub("", url_exists).strip().strip("/")
domain_raw = domain_raw.split("/")[0]
logger.info('Domain is %s', domain_raw)
# check for existence
matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites))
if matched_sites:
print(
f'Sites with domain "{domain_raw}" already exists in the Maigret database!'
)
status = lambda s: "(disabled)" if s.disabled else ""
url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}"
print(
"\n".join(
[
f"{site.name} {status(site)}{url_block(site)}"
for site in matched_sites
]
)
)
if input("Do you want to continue? [yN] ").lower() in "n":
return False
url_mainpage = extract_mainpage_url(url_exists)
print('Detecting site engine, please wait...')
sites = []
try:
sites = await detect_known_engine(db, url_exists, url_mainpage, logger)
except KeyboardInterrupt:
print('Engine detect process is interrupted.')
if not sites:
print("Unable to detect site engine, lets generate checking features")
sites = [
await check_features_manually(
db, url_exists, url_mainpage, cookie_file, logger
)
check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
]
logger.debug(sites[0].__dict__)
self.logger.info(f"Checking {site.name}...")
sem = asyncio.Semaphore(1)
print("Checking, please wait...")
found = False
chosen_site = None
for s in sites:
chosen_site = s
result = await site_self_check(s, logger, sem, db)
if not result["disabled"]:
found = True
break
if not found:
print(
f"Sorry, we couldn't find params to detect account presence/absence in {chosen_site.name}."
)
print(
"Try to run this mode again and increase features count or choose others."
)
return False
else:
if (
input(
f"Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] "
for username, status in check_data:
results_dict = await maigret(
username=username,
site_dict={site.name: site},
logger=self.logger,
timeout=30,
id_type=site.type,
forced=True,
no_progressbar=True,
)
# don't disable entries with other ids types
# TODO: make normal checking
if site.name not in results_dict:
self.logger.info(results_dict)
changes["disabled"] = True
continue
result = results_dict[site.name]["status"]
site_status = result.status
if site_status != status:
if site_status == QueryStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
self.logger.warning(
"Error while searching '%s' in %s: %s, %s, check type %s",
username,
site.name,
result.context,
msgs,
etype,
)
# don't disable in case of available username
if status == QueryStatus.CLAIMED:
changes["disabled"] = True
elif status == QueryStatus.CLAIMED:
self.logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
self.logger.info(results_dict[site.name])
changes["disabled"] = True
else:
self.logger.warning(
f"Found `{username}` in {site.name}, must be available"
)
self.logger.info(results_dict[site.name])
changes["disabled"] = True
self.logger.info(f"Site {site.name} checking is finished")
return changes
def generate_additional_fields_dialog(self, engine: MaigretEngine, dialog):
fields = {}
if 'urlSubpath' in engine.site.get('url', ''):
msg = (
'Detected engine suppose additional URL subpath using (/forum/, /blog/, etc). '
'Enter in manually if it exists: '
)
subpath = input(msg).strip('/')
if subpath:
fields['urlSubpath'] = f'/{subpath}'
return fields
async def detect_known_engine(self, url_exists, url_mainpage) -> List[MaigretSite]:
try:
r = requests.get(url_mainpage)
self.logger.debug(r.text)
except Exception as e:
self.logger.warning(e)
print("Some error while checking main page")
return []
for engine in self.db.engines:
strs_to_check = engine.__dict__.get("presenseStrs")
if strs_to_check and r and r.text:
all_strs_in_response = True
for s in strs_to_check:
if s not in r.text:
all_strs_in_response = False
sites = []
if all_strs_in_response:
engine_name = engine.__dict__.get("name")
print(f"Detected engine {engine_name} for site {url_mainpage}")
usernames_to_check = self.settings.supposed_usernames
supposed_username = self.extract_username_dialog(url_exists)
if supposed_username:
usernames_to_check = [supposed_username] + usernames_to_check
add_fields = self.generate_additional_fields_dialog(
engine, url_exists
)
for u in usernames_to_check:
site_data = {
"urlMain": url_mainpage,
"name": url_mainpage.split("//")[1],
"engine": engine_name,
"usernameClaimed": u,
"usernameUnclaimed": "noonewouldeverusethis7",
**add_fields,
}
self.logger.info(site_data)
maigret_site = MaigretSite(
url_mainpage.split("/")[-1], site_data
)
maigret_site.update_from_engine(
self.db.engines_dict[engine_name]
)
sites.append(maigret_site)
return sites
return []
def extract_username_dialog(self, url):
url_parts = url.rstrip("/").split("/")
supposed_username = url_parts[-1].strip('@')
entered_username = input(
f'Is "{supposed_username}" a valid username? If not, write it manually: '
)
return entered_username if entered_username else supposed_username
async def check_features_manually(
self, url_exists, url_mainpage, cookie_file, redirects=False
):
custom_headers = {}
while True:
header_key = input(
'Specify custom header if you need or just press Enter to skip. Header name: '
)
if not header_key:
break
header_value = input('Header value: ')
custom_headers[header_key.strip()] = header_value.strip()
supposed_username = self.extract_username_dialog(url_exists)
non_exist_username = "noonewouldeverusethis7"
url_user = url_exists.replace(supposed_username, "{username}")
url_not_exists = url_exists.replace(supposed_username, non_exist_username)
headers = dict(self.HEADERS)
headers.update(custom_headers)
# cookies
cookie_dict = None
if cookie_file:
self.logger.info(f'Use {cookie_file} for cookies')
cookie_jar = import_aiohttp_cookies(cookie_file)
cookie_dict = {c.key: c.value for c in cookie_jar}
exists_resp = requests.get(
url_exists, cookies=cookie_dict, headers=headers, allow_redirects=redirects
)
self.logger.debug(url_exists)
self.logger.debug(exists_resp.status_code)
self.logger.debug(exists_resp.text)
non_exists_resp = requests.get(
url_not_exists,
cookies=cookie_dict,
headers=headers,
allow_redirects=redirects,
)
self.logger.debug(url_not_exists)
self.logger.debug(non_exists_resp.status_code)
self.logger.debug(non_exists_resp.text)
a = exists_resp.text
b = non_exists_resp.text
tokens_a = set(re.split(f'[{self.SEPARATORS}]', a))
tokens_b = set(re.split(f'[{self.SEPARATORS}]', b))
a_minus_b = tokens_a.difference(tokens_b)
b_minus_a = tokens_b.difference(tokens_a)
if len(a_minus_b) == len(b_minus_a) == 0:
print("The pages for existing and non-existing account are the same!")
top_features_count = int(
input(
f"Specify count of features to extract [default {self.TOP_FEATURES}]: "
)
or self.TOP_FEATURES
)
match_fun = get_match_ratio(self.settings.presence_strings)
presence_list = sorted(a_minus_b, key=match_fun, reverse=True)[
:top_features_count
]
print("Detected text features of existing account: " + ", ".join(presence_list))
features = input("If features was not detected correctly, write it manually: ")
if features:
presence_list = list(map(str.strip, features.split(",")))
absence_list = sorted(b_minus_a, key=match_fun, reverse=True)[
:top_features_count
]
print(
"Detected text features of non-existing account: " + ", ".join(absence_list)
)
features = input("If features was not detected correctly, write it manually: ")
if features:
absence_list = list(map(str.strip, features.split(",")))
site_data = {
"absenceStrs": absence_list,
"presenseStrs": presence_list,
"url": url_user,
"urlMain": url_mainpage,
"usernameClaimed": supposed_username,
"usernameUnclaimed": non_exist_username,
"checkType": "message",
}
if headers != self.HEADERS:
site_data['headers'] = headers
site = MaigretSite(url_mainpage.split("/")[-1], site_data)
return site
async def dialog(self, url_exists, cookie_file):
domain_raw = self.URL_RE.sub("", url_exists).strip().strip("/")
domain_raw = domain_raw.split("/")[0]
self.logger.info('Domain is %s', domain_raw)
# check for existence
matched_sites = list(
filter(lambda x: domain_raw in x.url_main + x.url, self.db.sites)
)
if matched_sites:
print(
f'Sites with domain "{domain_raw}" already exists in the Maigret database!'
)
status = lambda s: "(disabled)" if s.disabled else ""
url_block = lambda s: f"\n\t{s.url_main}\n\t{s.url}"
print(
"\n".join(
[
f"{site.name} {status(site)}{url_block(site)}"
for site in matched_sites
]
)
)
if input("Do you want to continue? [yN] ").lower() in "n":
return False
url_mainpage = self.extract_mainpage_url(url_exists)
print('Detecting site engine, please wait...')
sites = []
try:
sites = await self.detect_known_engine(url_exists, url_mainpage)
except KeyboardInterrupt:
print('Engine detect process is interrupted.')
if not sites:
print("Unable to detect site engine, lets generate checking features")
sites = [
await self.check_features_manually(
url_exists, url_mainpage, cookie_file
)
]
self.logger.debug(sites[0].__dict__)
sem = asyncio.Semaphore(1)
print("Checking, please wait...")
found = False
chosen_site = None
for s in sites:
chosen_site = s
result = await self.site_self_check(s, sem)
if not result["disabled"]:
found = True
break
if not found:
print(
f"Sorry, we couldn't find params to detect account presence/absence in {chosen_site.name}."
)
print(
"Try to run this mode again and increase features count or choose others."
)
.lower()
.strip("y")
):
return False
else:
if (
input(
f"Site {chosen_site.name} successfully checked. Do you want to save it in the Maigret DB? [Yn] "
)
.lower()
.strip("y")
):
return False
chosen_site.name = input("Change site name if you want: ") or chosen_site.name
chosen_site.tags = list(map(str.strip, input("Site tags: ").split(',')))
rank = get_alexa_rank(chosen_site.url_main)
if rank:
print(f'New alexa rank: {rank}')
chosen_site.alexa_rank = rank
chosen_site.name = input("Change site name if you want: ") or chosen_site.name
chosen_site.tags = list(map(str.strip, input("Site tags: ").split(',')))
rank = Submitter.get_alexa_rank(chosen_site.url_main)
if rank:
print(f'New alexa rank: {rank}')
chosen_site.alexa_rank = rank
logger.debug(chosen_site.json)
site_data = chosen_site.strip_engine_data()
logger.debug(site_data.json)
db.update_site(site_data)
return True
self.logger.debug(chosen_site.json)
site_data = chosen_site.strip_engine_data()
self.logger.debug(site_data.json)
self.db.update_site(site_data)
return True
+16
View File
@@ -1,4 +1,5 @@
import ast
import difflib
import re
import random
from typing import Any
@@ -95,3 +96,18 @@ def get_dict_ascii_tree(items, prepend="", new_line=True):
def get_random_user_agent():
return random.choice(DEFAULT_USER_AGENTS)
def get_match_ratio(base_strs: list):
def get_match_inner(s: str):
return round(
max(
[
difflib.SequenceMatcher(a=s.lower(), b=s2.lower()).ratio()
for s2 in base_strs
]
),
2,
)
return get_match_inner
+3 -2
View File
@@ -1,15 +1,16 @@
"""Maigret data test functions"""
from maigret.utils import is_country_tag
from maigret.sites import SUPPORTED_TAGS
def test_tags_validity(default_db):
unknown_tags = set()
tags = default_db._tags
for site in default_db.sites:
for tag in filter(lambda x: not is_country_tag(x), site.tags):
if tag not in SUPPORTED_TAGS:
if tag not in tags:
unknown_tags.add(tag)
assert unknown_tags == set()
+24
View File
@@ -1,5 +1,6 @@
"""Maigret Database test functions"""
from maigret.sites import MaigretDatabase, MaigretSite
from maigret.utils import URLMatcher
EXAMPLE_DB = {
'engines': {
@@ -179,3 +180,26 @@ def test_ranked_sites_dict_id_type():
assert len(db.ranked_sites_dict()) == 2
assert len(db.ranked_sites_dict(id_type='username')) == 2
assert len(db.ranked_sites_dict(id_type='gaia_id')) == 1
def test_get_url_template():
site = MaigretSite(
"test",
{
"urlMain": "https://ya.ru/",
"url": "{urlMain}{urlSubpath}/members/?username={username}",
},
)
assert (
site.get_url_template()
== "{urlMain}{urlSubpath}/members/?username={username} (no engine)"
)
site = MaigretSite(
"test",
{
"urlMain": "https://ya.ru/",
"url": "https://{username}.ya.ru",
},
)
assert site.get_url_template() == "SUBDOMAIN"
+7
View File
@@ -8,6 +8,7 @@ from maigret.utils import (
enrich_link_str,
URLMatcher,
get_dict_ascii_tree,
get_match_ratio,
)
@@ -136,3 +137,9 @@ def test_get_dict_ascii_tree():
instagram_username: Street.Reality.Photography
twitter_username: Alexaimephotogr"""
)
def test_get_match_ratio():
fun = get_match_ratio(["test", "maigret", "username"])
assert fun("test") == 1