mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-06 22:19:01 +00:00
Improved sites list filtering, pretty messages
This commit is contained in:
+36
-30
@@ -805,13 +805,20 @@ async def main():
|
||||
if args.top_sites == 0 or args.all_sites:
|
||||
args.top_sites = sys.maxsize
|
||||
|
||||
# Create notify object for query results.
|
||||
query_notify = QueryNotifyPrint(result=None,
|
||||
verbose=args.verbose,
|
||||
print_found_only=not args.print_not_found,
|
||||
skip_check_errors=not args.print_check_errors,
|
||||
color=not args.no_color)
|
||||
|
||||
# Create object with all information about sites we are aware of.
|
||||
try:
|
||||
db = MaigretDatabase().load_from_file(args.json_file)
|
||||
site_data = db.ranked_sites_dict(top=args.top_sites, tags=args.tags, names=args.site_list)
|
||||
except Exception as error:
|
||||
print(f"ERROR: {error}")
|
||||
sys.exit(1)
|
||||
db = MaigretDatabase().load_from_file(args.json_file)
|
||||
get_top_sites_for_id = lambda x: db.ranked_sites_dict(top=args.top_sites, tags=args.tags,
|
||||
names=args.site_list,
|
||||
disabled=False, id_type=x)
|
||||
|
||||
site_data = get_top_sites_for_id(args.id_type)
|
||||
|
||||
# Database self-checking
|
||||
if args.self_check:
|
||||
@@ -832,28 +839,25 @@ async def main():
|
||||
# Define one report filename template
|
||||
report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}')
|
||||
|
||||
# Database consistency
|
||||
enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
|
||||
print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
|
||||
# Database stats
|
||||
# TODO: verbose info about filtered sites
|
||||
# enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
|
||||
# print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
|
||||
|
||||
if not enabled_count:
|
||||
print('No sites to check, exiting!')
|
||||
sys.exit(2)
|
||||
|
||||
if usernames == ['-']:
|
||||
if usernames == {}:
|
||||
# magic params to exit after init
|
||||
print('No usernames to check, exiting.')
|
||||
query_notify.warning('No usernames to check, exiting.')
|
||||
sys.exit(0)
|
||||
|
||||
# Create notify object for query results.
|
||||
query_notify = QueryNotifyPrint(result=None,
|
||||
verbose=args.verbose,
|
||||
print_found_only=not args.print_not_found,
|
||||
skip_check_errors=not args.print_check_errors,
|
||||
color=not args.no_color)
|
||||
if not site_data:
|
||||
query_notify.warning('No sites to check, exiting!')
|
||||
sys.exit(2)
|
||||
else:
|
||||
query_notify.warning(f'Starting a search on top {len(site_data)} sites from the Maigret database...')
|
||||
if not args.all_sites:
|
||||
query_notify.warning(f'You can run search by full list of sites with flag `-a`', '!')
|
||||
|
||||
already_checked = set()
|
||||
|
||||
general_results = []
|
||||
|
||||
while usernames:
|
||||
@@ -870,11 +874,13 @@ async def main():
|
||||
|
||||
if found_unsupported_chars:
|
||||
pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars))
|
||||
print(f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
|
||||
query_notify.warning(f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
|
||||
continue
|
||||
|
||||
sites_to_check = get_top_sites_for_id(id_type)
|
||||
|
||||
results = await maigret(username,
|
||||
dict(site_data),
|
||||
dict(sites_to_check),
|
||||
query_notify,
|
||||
proxy=args.proxy,
|
||||
timeout=args.timeout,
|
||||
@@ -905,22 +911,22 @@ async def main():
|
||||
if args.xmind:
|
||||
filename = report_filepath_tpl.format(username=username, postfix='.xmind')
|
||||
save_xmind_report(filename, username, results)
|
||||
print(f'XMind report for {username} saved in {filename}')
|
||||
query_notify.warning(f'XMind report for {username} saved in {filename}')
|
||||
|
||||
if args.csv:
|
||||
filename = report_filepath_tpl.format(username=username, postfix='.csv')
|
||||
save_csv_report(filename, username, results)
|
||||
print(f'CSV report for {username} saved in {filename}')
|
||||
query_notify.warning(f'CSV report for {username} saved in {filename}')
|
||||
|
||||
if args.txt:
|
||||
filename = report_filepath_tpl.format(username=username, postfix='.txt')
|
||||
save_txt_report(filename, username, results)
|
||||
print(f'TXT report for {username} saved in {filename}')
|
||||
query_notify.warning(f'TXT report for {username} saved in {filename}')
|
||||
|
||||
# reporting for all the result
|
||||
if general_results:
|
||||
if args.html or args.pdf:
|
||||
print('Generating report info...')
|
||||
query_notify.warning('Generating report info...')
|
||||
report_context = generate_report_context(general_results)
|
||||
# determine main username
|
||||
username = report_context['username']
|
||||
@@ -928,12 +934,12 @@ async def main():
|
||||
if args.html:
|
||||
filename = report_filepath_tpl.format(username=username, postfix='.html')
|
||||
save_html_report(filename, report_context)
|
||||
print(f'HTML report on all usernames saved in {filename}')
|
||||
query_notify.warning(f'HTML report on all usernames saved in {filename}')
|
||||
|
||||
if args.pdf:
|
||||
filename = report_filepath_tpl.format(username=username, postfix='.pdf')
|
||||
save_pdf_report(filename, report_context)
|
||||
print(f'PDF report on all usernames saved in {filename}')
|
||||
query_notify.warning(f'PDF report on all usernames saved in {filename}')
|
||||
# update database
|
||||
db.save_to_file(args.json_file)
|
||||
|
||||
|
||||
+6
-1
@@ -168,7 +168,12 @@ class QueryNotifyPrint(QueryNotify):
|
||||
else:
|
||||
print(f"[*] {title} {message} on:")
|
||||
|
||||
return
|
||||
def warning(self, message, symbol='-'):
|
||||
msg = f'[{symbol}] {message}'
|
||||
if self.color:
|
||||
print(Style.BRIGHT + Fore.YELLOW + msg)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
def get_additional_data_text(self, items, prepend=''):
|
||||
text = ''
|
||||
|
||||
@@ -10372,7 +10372,12 @@
|
||||
"us"
|
||||
],
|
||||
"checkType": "message",
|
||||
"absenceStrs": "The page you are looking for doesn\u2019t exist",
|
||||
"presenseStrs": [
|
||||
"{\"username\""
|
||||
],
|
||||
"absenceStrs": [
|
||||
"We seem to have lost this page"
|
||||
],
|
||||
"alexaRank": 12727,
|
||||
"url": "https://www.producthunt.com/@{username}",
|
||||
"urlMain": "https://www.producthunt.com/",
|
||||
@@ -13562,7 +13567,7 @@
|
||||
"sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"",
|
||||
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA",
|
||||
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
|
||||
"x-guest-token": "1357438625504518145"
|
||||
"x-guest-token": "1358064134064140290"
|
||||
},
|
||||
"errors": {
|
||||
"Bad guest token": "x-guest-token update required"
|
||||
@@ -13929,7 +13934,7 @@
|
||||
"video"
|
||||
],
|
||||
"headers": {
|
||||
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTI0NzQ1MDAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.KmVN4YyuyqhUo8xr006lpL5k3_Uj2Y_ygk2r8cEO9Qo"
|
||||
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTI2MjQ4NjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kgp8r380d1aDWcd-ROncr0Tqf8EdA-l35EeEY9is6TI"
|
||||
},
|
||||
"activation": {
|
||||
"url": "https://vimeo.com/_rv/viewer",
|
||||
|
||||
+15
-10
@@ -140,22 +140,27 @@ class MaigretDatabase:
|
||||
def sites_dict(self):
|
||||
return {site.name: site for site in self._sites}
|
||||
|
||||
def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[]):
|
||||
def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[],
|
||||
disabled=True, id_type='username'):
|
||||
"""
|
||||
Ranking and filtering of the sites list
|
||||
"""
|
||||
normalized_names = list(map(str.lower, names))
|
||||
normalized_tags = list(map(str.lower, tags))
|
||||
|
||||
def is_tags_ok(site):
|
||||
intersected_tags = set(site.tags).intersection(set(normalized_tags))
|
||||
is_disabled = 'disabled' in tags and site.disabled
|
||||
return intersected_tags or is_disabled
|
||||
|
||||
is_name_ok = lambda x: x.name.lower() in normalized_names
|
||||
is_engine_ok = lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
|
||||
is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags))
|
||||
is_disabled_needed = lambda x: not x.disabled or ('disabled' in tags or disabled)
|
||||
is_id_type_ok = lambda x: x.type == id_type
|
||||
|
||||
if not tags and not names:
|
||||
filtered_list = self.sites
|
||||
else:
|
||||
filtered_list = [s for s in self.sites if is_tags_ok(s) or is_name_ok(s) or is_engine_ok(s)]
|
||||
filter_tags_engines_fun = lambda x: not tags or is_engine_ok(x) or is_tags_ok(x)
|
||||
filter_names_fun = lambda x: not names or is_name_ok(x)
|
||||
|
||||
filter_fun = lambda x: filter_tags_engines_fun(x) and filter_names_fun(x) \
|
||||
and is_disabled_needed(x) and is_id_type_ok(x)
|
||||
|
||||
filtered_list = [s for s in self.sites if filter_fun(s)]
|
||||
|
||||
sorted_list = sorted(filtered_list, key=lambda x: x.alexa_rank, reverse=reverse)[:top]
|
||||
return {site.name: site for site in sorted_list}
|
||||
|
||||
+33
-6
@@ -131,13 +131,40 @@ def test_ranked_sites_dict():
|
||||
# filtering by engine
|
||||
assert list(db.ranked_sites_dict(tags=['ucoz']).keys()) == ['3']
|
||||
|
||||
# disjunction
|
||||
assert list(db.ranked_sites_dict(names=['2'], tags=['forum']).keys()) == ['2']
|
||||
assert list(db.ranked_sites_dict(names=['2'], tags=['ucoz']).keys()) == []
|
||||
assert list(db.ranked_sites_dict(names=['4'], tags=['ru']).keys()) == []
|
||||
|
||||
# reverse
|
||||
assert list(db.ranked_sites_dict(reverse=True).keys()) == ['3', '2', '1']
|
||||
|
||||
|
||||
def test_ranked_sites_dict_names():
|
||||
db = MaigretDatabase()
|
||||
db.update_site(MaigretSite('3', {'alexaRank': 30}))
|
||||
db.update_site(MaigretSite('1', {'alexaRank': 2}))
|
||||
db.update_site(MaigretSite('2', {'alexaRank': 10}))
|
||||
|
||||
# filtering by names
|
||||
assert list(db.ranked_sites_dict(names=['1', '2']).keys()) == ['1', '2']
|
||||
assert list(db.ranked_sites_dict(names=['2', '3']).keys()) == ['2', '3']
|
||||
|
||||
# disjunction
|
||||
assert list(db.ranked_sites_dict(names=['2'], tags=['forum']).keys()) == ['1', '2']
|
||||
assert list(db.ranked_sites_dict(names=['2'], tags=['forum'], reverse=True).keys()) == ['2', '1']
|
||||
assert list(db.ranked_sites_dict(names=['2'], tags=['ucoz']).keys()) == ['2', '3']
|
||||
assert list(db.ranked_sites_dict(names=['4'], tags=['ru']).keys()) == ['2']
|
||||
assert list(db.ranked_sites_dict(names=['4'], tags=['nosuchtag']).keys()) == []
|
||||
|
||||
def test_ranked_sites_dict_disabled():
|
||||
db = MaigretDatabase()
|
||||
db.update_site(MaigretSite('1', {'disabled': True}))
|
||||
db.update_site(MaigretSite('2', {}))
|
||||
|
||||
assert len(db.ranked_sites_dict()) == 2
|
||||
assert len(db.ranked_sites_dict(disabled=False)) == 1
|
||||
|
||||
def test_ranked_sites_dict_id_type():
|
||||
db = MaigretDatabase()
|
||||
db.update_site(MaigretSite('1', {}))
|
||||
db.update_site(MaigretSite('2', {'type': 'username'}))
|
||||
db.update_site(MaigretSite('3', {'type': 'gaia_id'}))
|
||||
|
||||
assert len(db.ranked_sites_dict()) == 2
|
||||
assert len(db.ranked_sites_dict(id_type='username')) == 2
|
||||
assert len(db.ranked_sites_dict(id_type='gaia_id')) == 1
|
||||
|
||||
Reference in New Issue
Block a user