Merge pull request #48 from soxoj/filter-fixes

Improved sites list filtering, pretty messages
This commit is contained in:
soxoj
2021-02-06 18:21:22 +03:00
committed by GitHub
5 changed files with 98 additions and 50 deletions
+36 -30
View File
@@ -805,13 +805,20 @@ async def main():
if args.top_sites == 0 or args.all_sites:
args.top_sites = sys.maxsize
# Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
print_found_only=not args.print_not_found,
skip_check_errors=not args.print_check_errors,
color=not args.no_color)
# Create object with all information about sites we are aware of.
try:
db = MaigretDatabase().load_from_file(args.json_file)
site_data = db.ranked_sites_dict(top=args.top_sites, tags=args.tags, names=args.site_list)
except Exception as error:
print(f"ERROR: {error}")
sys.exit(1)
db = MaigretDatabase().load_from_file(args.json_file)
get_top_sites_for_id = lambda x: db.ranked_sites_dict(top=args.top_sites, tags=args.tags,
names=args.site_list,
disabled=False, id_type=x)
site_data = get_top_sites_for_id(args.id_type)
# Database self-checking
if args.self_check:
@@ -832,28 +839,25 @@ async def main():
# Define one report filename template
report_filepath_tpl = os.path.join(args.folderoutput, 'report_{username}{postfix}')
# Database consistency
enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
# Database stats
# TODO: verbose info about filtered sites
# enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
# print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
if not enabled_count:
print('No sites to check, exiting!')
sys.exit(2)
if usernames == ['-']:
if usernames == {}:
# magic params to exit after init
print('No usernames to check, exiting.')
query_notify.warning('No usernames to check, exiting.')
sys.exit(0)
# Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
print_found_only=not args.print_not_found,
skip_check_errors=not args.print_check_errors,
color=not args.no_color)
if not site_data:
query_notify.warning('No sites to check, exiting!')
sys.exit(2)
else:
query_notify.warning(f'Starting a search on top {len(site_data)} sites from the Maigret database...')
if not args.all_sites:
query_notify.warning(f'You can run search by full list of sites with flag `-a`', '!')
already_checked = set()
general_results = []
while usernames:
@@ -870,11 +874,13 @@ async def main():
if found_unsupported_chars:
pretty_chars_str = ','.join(map(lambda s: f'"{s}"', found_unsupported_chars))
print(f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
query_notify.warning(f'Found unsupported URL characters: {pretty_chars_str}, skip search by username "{username}"')
continue
sites_to_check = get_top_sites_for_id(id_type)
results = await maigret(username,
dict(site_data),
dict(sites_to_check),
query_notify,
proxy=args.proxy,
timeout=args.timeout,
@@ -905,22 +911,22 @@ async def main():
if args.xmind:
filename = report_filepath_tpl.format(username=username, postfix='.xmind')
save_xmind_report(filename, username, results)
print(f'XMind report for {username} saved in {filename}')
query_notify.warning(f'XMind report for {username} saved in {filename}')
if args.csv:
filename = report_filepath_tpl.format(username=username, postfix='.csv')
save_csv_report(filename, username, results)
print(f'CSV report for {username} saved in {filename}')
query_notify.warning(f'CSV report for {username} saved in {filename}')
if args.txt:
filename = report_filepath_tpl.format(username=username, postfix='.txt')
save_txt_report(filename, username, results)
print(f'TXT report for {username} saved in {filename}')
query_notify.warning(f'TXT report for {username} saved in {filename}')
# reporting for all the result
if general_results:
if args.html or args.pdf:
print('Generating report info...')
query_notify.warning('Generating report info...')
report_context = generate_report_context(general_results)
# determine main username
username = report_context['username']
@@ -928,12 +934,12 @@ async def main():
if args.html:
filename = report_filepath_tpl.format(username=username, postfix='.html')
save_html_report(filename, report_context)
print(f'HTML report on all usernames saved in {filename}')
query_notify.warning(f'HTML report on all usernames saved in {filename}')
if args.pdf:
filename = report_filepath_tpl.format(username=username, postfix='.pdf')
save_pdf_report(filename, report_context)
print(f'PDF report on all usernames saved in {filename}')
query_notify.warning(f'PDF report on all usernames saved in {filename}')
# update database
db.save_to_file(args.json_file)
+6 -1
View File
@@ -168,7 +168,12 @@ class QueryNotifyPrint(QueryNotify):
else:
print(f"[*] {title} {message} on:")
return
def warning(self, message, symbol='-'):
msg = f'[{symbol}] {message}'
if self.color:
print(Style.BRIGHT + Fore.YELLOW + msg)
else:
print(msg)
def get_additional_data_text(self, items, prepend=''):
text = ''
+8 -3
View File
@@ -10372,7 +10372,12 @@
"us"
],
"checkType": "message",
"absenceStrs": "The page you are looking for doesn\u2019t exist",
"presenseStrs": [
"{\"username\""
],
"absenceStrs": [
"We seem to have lost this page"
],
"alexaRank": 12727,
"url": "https://www.producthunt.com/@{username}",
"urlMain": "https://www.producthunt.com/",
@@ -13562,7 +13567,7 @@
"sec-ch-ua": "Google Chrome\";v=\"87\", \" Not;A Brand\";v=\"99\", \"Chromium\";v=\"87\"",
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"x-guest-token": "1357438625504518145"
"x-guest-token": "1358064134064140290"
},
"errors": {
"Bad guest token": "x-guest-token update required"
@@ -13929,7 +13934,7 @@
"video"
],
"headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTI0NzQ1MDAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.KmVN4YyuyqhUo8xr006lpL5k3_Uj2Y_ygk2r8cEO9Qo"
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTI2MjQ4NjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kgp8r380d1aDWcd-ROncr0Tqf8EdA-l35EeEY9is6TI"
},
"activation": {
"url": "https://vimeo.com/_rv/viewer",
+15 -10
View File
@@ -140,22 +140,27 @@ class MaigretDatabase:
def sites_dict(self):
return {site.name: site for site in self._sites}
def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[]):
def ranked_sites_dict(self, reverse=False, top=sys.maxsize, tags=[], names=[],
disabled=True, id_type='username'):
"""
Ranking and filtering of the sites list
"""
normalized_names = list(map(str.lower, names))
normalized_tags = list(map(str.lower, tags))
def is_tags_ok(site):
intersected_tags = set(site.tags).intersection(set(normalized_tags))
is_disabled = 'disabled' in tags and site.disabled
return intersected_tags or is_disabled
is_name_ok = lambda x: x.name.lower() in normalized_names
is_engine_ok = lambda x: isinstance(x.engine, str) and x.engine.lower() in normalized_tags
is_tags_ok = lambda x: set(x.tags).intersection(set(normalized_tags))
is_disabled_needed = lambda x: not x.disabled or ('disabled' in tags or disabled)
is_id_type_ok = lambda x: x.type == id_type
if not tags and not names:
filtered_list = self.sites
else:
filtered_list = [s for s in self.sites if is_tags_ok(s) or is_name_ok(s) or is_engine_ok(s)]
filter_tags_engines_fun = lambda x: not tags or is_engine_ok(x) or is_tags_ok(x)
filter_names_fun = lambda x: not names or is_name_ok(x)
filter_fun = lambda x: filter_tags_engines_fun(x) and filter_names_fun(x) \
and is_disabled_needed(x) and is_id_type_ok(x)
filtered_list = [s for s in self.sites if filter_fun(s)]
sorted_list = sorted(filtered_list, key=lambda x: x.alexa_rank, reverse=reverse)[:top]
return {site.name: site for site in sorted_list}
+33 -6
View File
@@ -131,13 +131,40 @@ def test_ranked_sites_dict():
# filtering by engine
assert list(db.ranked_sites_dict(tags=['ucoz']).keys()) == ['3']
# disjunction
assert list(db.ranked_sites_dict(names=['2'], tags=['forum']).keys()) == ['2']
assert list(db.ranked_sites_dict(names=['2'], tags=['ucoz']).keys()) == []
assert list(db.ranked_sites_dict(names=['4'], tags=['ru']).keys()) == []
# reverse
assert list(db.ranked_sites_dict(reverse=True).keys()) == ['3', '2', '1']
def test_ranked_sites_dict_names():
db = MaigretDatabase()
db.update_site(MaigretSite('3', {'alexaRank': 30}))
db.update_site(MaigretSite('1', {'alexaRank': 2}))
db.update_site(MaigretSite('2', {'alexaRank': 10}))
# filtering by names
assert list(db.ranked_sites_dict(names=['1', '2']).keys()) == ['1', '2']
assert list(db.ranked_sites_dict(names=['2', '3']).keys()) == ['2', '3']
# disjunction
assert list(db.ranked_sites_dict(names=['2'], tags=['forum']).keys()) == ['1', '2']
assert list(db.ranked_sites_dict(names=['2'], tags=['forum'], reverse=True).keys()) == ['2', '1']
assert list(db.ranked_sites_dict(names=['2'], tags=['ucoz']).keys()) == ['2', '3']
assert list(db.ranked_sites_dict(names=['4'], tags=['ru']).keys()) == ['2']
assert list(db.ranked_sites_dict(names=['4'], tags=['nosuchtag']).keys()) == []
def test_ranked_sites_dict_disabled():
db = MaigretDatabase()
db.update_site(MaigretSite('1', {'disabled': True}))
db.update_site(MaigretSite('2', {}))
assert len(db.ranked_sites_dict()) == 2
assert len(db.ranked_sites_dict(disabled=False)) == 1
def test_ranked_sites_dict_id_type():
db = MaigretDatabase()
db.update_site(MaigretSite('1', {}))
db.update_site(MaigretSite('2', {'type': 'username'}))
db.update_site(MaigretSite('3', {'type': 'gaia_id'}))
assert len(db.ranked_sites_dict()) == 2
assert len(db.ranked_sites_dict(id_type='username')) == 2
assert len(db.ranked_sites_dict(id_type='gaia_id')) == 1