Reformat code, some sites added

This commit is contained in:
Soxoj
2021-03-19 01:48:20 +03:00
parent 940f408da3
commit 908176be85
14 changed files with 194 additions and 101 deletions
+2 -4
View File
@@ -1,11 +1,9 @@
import aiohttp
from aiohttp import CookieJar
import asyncio
import json
from http.cookiejar import MozillaCookieJar
from http.cookies import Morsel
import requests
from aiohttp import CookieJar
class ParsingActivator:
@staticmethod
+6 -2
View File
@@ -467,8 +467,12 @@ async def maigret(username, site_dict, query_notify, logger,
if no_progressbar:
await asyncio.gather(*tasks)
else:
for f in tqdm.asyncio.tqdm.as_completed(tasks):
await f
for f in tqdm.asyncio.tqdm.as_completed(tasks, timeout=timeout):
try:
await f
except asyncio.exceptions.TimeoutError:
# TODO: write timeout to results
pass
await session.close()
+2 -4
View File
@@ -4,7 +4,6 @@ Maigret main module
import os
import platform
import sys
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import requests
@@ -176,7 +175,7 @@ async def main():
action="store", metavar='REPORT_TYPE',
dest="json", default='', type=check_supported_json_format,
help=f"Generate a JSON report of specific type: {', '.join(SUPPORTED_JSON_REPORT_FORMATS)}"
" (one report per username)."
" (one report per username)."
)
args = parser.parse_args()
@@ -204,7 +203,7 @@ async def main():
u: args.id_type
for u in args.username
if u not in ['-']
and u not in args.ignore_ids_list
and u not in args.ignore_ids_list
}
parsing_enabled = not args.disable_extracting
@@ -380,7 +379,6 @@ async def main():
save_json_report(filename, username, results, report_type=args.json)
query_notify.warning(f'JSON {args.json} report for {username} saved in {filename}')
# reporting for all the result
if general_results:
if args.html or args.pdf:
+1
View File
@@ -4,6 +4,7 @@ This module defines the objects for notifying the caller about the
results of queries.
"""
import sys
from colorama import Fore, Style, init
from .result import QueryStatus
+25 -15
View File
@@ -1,15 +1,16 @@
import csv
import json
import io
import json
import logging
import os
from argparse import ArgumentTypeError
from datetime import datetime
import pycountry
import xmind
from datetime import datetime
from dateutil.parser import parse as parse_datetime_str
from jinja2 import Template
from xhtml2pdf import pisa
from argparse import ArgumentTypeError
from dateutil.parser import parse as parse_datetime_str
from .result import QueryStatus
from .utils import is_country_tag, CaseConverter, enrich_link_str
@@ -19,10 +20,11 @@ SUPPORTED_JSON_REPORT_FORMATS = [
'ndjson',
]
'''
UTILS
'''
def filter_supposed_data(data):
### interesting fields
allowed_fields = ['fullname', 'gender', 'location', 'age']
@@ -35,6 +37,8 @@ def filter_supposed_data(data):
'''
REPORTS SAVING
'''
def save_csv_report(filename: str, username: str, results: dict):
with open(filename, 'w', newline='', encoding='utf-8') as f:
generate_csv_report(username, results, f)
@@ -58,6 +62,7 @@ def save_pdf_report(filename: str, context: dict):
with open(filename, 'w+b') as f:
pisa.pisaDocument(io.StringIO(filled_template), dest=f, default_css=css)
def save_json_report(filename: str, username: str, results: dict, report_type: str):
with open(filename, 'w', encoding='utf-8') as f:
generate_json_report(username, results, f, report_type=report_type)
@@ -66,10 +71,13 @@ def save_json_report(filename: str, username: str, results: dict, report_type: s
'''
REPORTS GENERATING
'''
def generate_report_template(is_pdf: bool):
"""
HTML/PDF template generation
"""
def get_resource_content(filename):
return open(os.path.join(maigret_path, 'resources', filename)).read()
@@ -112,6 +120,9 @@ def generate_report_context(username_results: list):
continue
status = dictionary.get('status')
if not status: # FIXME: currently in case of timeout
continue
if status.ids_data:
dictionary['ids_data'] = status.ids_data
extended_info_count += 1
@@ -166,7 +177,6 @@ def generate_report_context(username_results: list):
for t in status.tags:
tags[t] = tags.get(t, 0) + 1
brief_text.append(f'Search by {id_type} {username} returned {found_accounts} accounts.')
if new_ids:
@@ -177,8 +187,6 @@ def generate_report_context(username_results: list):
brief_text.append(f'Extended info extracted from {extended_info_count} accounts.')
brief = ' '.join(brief_text).strip()
tuple_sort = lambda d: sorted(d, key=lambda x: x[1], reverse=True)
@@ -221,7 +229,7 @@ def generate_csv_report(username: str, results: dict, csvfile):
results[site]['url_user'],
str(results[site]['status'].status),
results[site]['http_status'],
])
])
def generate_txt_report(username: str, results: dict, file):
@@ -253,16 +261,19 @@ def generate_json_report(username: str, results: dict, file, report_type):
if is_report_per_line:
data['sitename'] = sitename
file.write(json.dumps(data)+'\n')
file.write(json.dumps(data) + '\n')
else:
all_json[sitename] = data
if not is_report_per_line:
file.write(json.dumps(all_json))
'''
XMIND 8 Functions
'''
def save_xmind_report(filename, username, results):
if os.path.exists(filename):
os.remove(filename)
@@ -277,9 +288,9 @@ def design_sheet(sheet, username, results):
alltags = {}
supposed_data = {}
sheet.setTitle("%s Analysis"%(username))
sheet.setTitle("%s Analysis" % (username))
root_topic1 = sheet.getRootTopic()
root_topic1.setTitle("%s"%(username))
root_topic1.setTitle("%s" % (username))
undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("Undefined")
@@ -333,7 +344,7 @@ def design_sheet(sheet, username, results):
currentsublabel.setTitle("%s: %s" % (k, currentval))
### Add Supposed DATA
filterede_supposed_data = filter_supposed_data(supposed_data)
if(len(filterede_supposed_data) >0):
if (len(filterede_supposed_data) > 0):
undefinedsection = root_topic1.addSubTopic()
undefinedsection.setTitle("SUPPOSED DATA")
for k, v in filterede_supposed_data.items():
@@ -344,6 +355,5 @@ def design_sheet(sheet, username, results):
def check_supported_json_format(value):
if value and not value in SUPPORTED_JSON_REPORT_FORMATS:
raise ArgumentTypeError(f'JSON report type must be one of the following types: '
+ ', '.join(SUPPORTED_JSON_REPORT_FORMATS))
+ ', '.join(SUPPORTED_JSON_REPORT_FORMATS))
return value
+82 -2
View File
@@ -12349,7 +12349,7 @@
"us"
],
"headers": {
"authorization": "Bearer BQCEWXdzCPImYp4zhhbEssMRKqvUasJb9vVoe2A3J5eFMhTfn0b5jPkUHGJ9Fe0_HCaF81AMeRnSD9KzIPg"
"authorization": "Bearer BQA6sdhtUg3hadjln7DCoAK6sLn7KrHfsn2DObW2gr-W3HgF0h1KZGVYgwispRDR1tqRntVeTd0Duvb2q4g"
},
"errors": {
"Spotify is currently not available in your country.": "Access denied in your country, use proxy/vpn"
@@ -14062,7 +14062,7 @@
"video"
],
"headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYwOTgwODAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.tTecsUjIJ0KCcMxOT8OgkCp-P3ezg5RR0FGqtiejqE8"
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTYxMDcyNjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbH0.kzWxBf1qCJwjpZYUP6w-Pf4VptBMKpKUaMw8VnYwtPU"
},
"activation": {
"url": "https://vimeo.com/_rv/viewer",
@@ -14969,6 +14969,7 @@
"usernameUnclaimed": "noonewouldeverusethis7"
},
"YandexLocal": {
"disabled": true,
"tags": [
"ru"
],
@@ -23595,6 +23596,67 @@
"urlMain": "https://calendly.com",
"usernameClaimed": "john",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"depop.com": {
"checkType": "message",
"presenseStrs": [
"first_name"
],
"absenceStrs": [
"invalidUrlError__message"
],
"url": "https://www.depop.com/{username}",
"urlMain": "https://www.depop.com",
"usernameClaimed": "blue",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"community.brave.com": {
"engine": "Discourse",
"urlMain": "https://community.brave.com",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"community.endlessos.com": {
"engine": "Discourse",
"urlMain": "https://community.endlessos.com",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"forum.endeavouros.com": {
"engine": "Discourse",
"urlMain": "https://forum.endeavouros.com",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"forum.garudalinux.org": {
"engine": "Discourse",
"urlMain": "https://forum.garudalinux.org",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"forum.snapcraft.io": {
"engine": "Discourse",
"urlMain": "https://forum.snapcraft.io",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"forum.zorin.com": {
"engine": "Discourse",
"urlMain": "https://forum.zorin.com",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"codeseller.ru": {
"engine": "Wordpress/Author",
"urlMain": "https://codeseller.ru",
"usernameClaimed": "alex",
"usernameUnclaimed": "noonewouldeverusethis7"
},
"linuxpip.org": {
"engine": "Wordpress/Author",
"urlMain": "https://linuxpip.org",
"usernameClaimed": "diehard",
"usernameUnclaimed": "noonewouldeverusethis7"
}
},
"engines": {
@@ -23689,6 +23751,24 @@
"<meta name=\"generator\" content=\"Discourse"
]
},
"Wordpress/Author": {
"name": "Wordpress/Author",
"site": {
"presenseStrs": [
"author-",
"author/"
],
"absenceStrs": [
"error404"
],
"checkType": "message",
"url": "{urlMain}/author/{username}/"
},
"presenseStrs": [
"/wp-admin",
"/wp-includes/wlwmanifest.xml"
]
},
"engine404": {
"name": "engine404",
"site": {
+3 -9
View File
@@ -2,7 +2,6 @@
"""Maigret Sites Information"""
import copy
import json
import re
import sys
import requests
@@ -87,13 +86,12 @@ class MaigretSite:
url = self.url
for group in ['urlMain', 'urlSubpath']:
if group in url:
url = url.replace('{'+group+'}', self.__dict__[CaseConverter.camel_to_snake(group)])
url = url.replace('{' + group + '}', self.__dict__[CaseConverter.camel_to_snake(group)])
self.url_regexp = URLMatcher.make_profile_url_regexp(url, self.regex_check)
def detect_username(self, url: str) -> str:
if self.url_regexp:
import logging
match_groups = self.url_regexp.match(url)
if match_groups:
return match_groups.groups()[-1].rstrip('/')
@@ -238,7 +236,6 @@ class MaigretDatabase:
return self
def load_from_json(self, json_data: dict) -> MaigretDatabase:
# Add all of site information from the json file to internal site list.
site_data = json_data.get("sites", {})
@@ -263,7 +260,6 @@ class MaigretDatabase:
return self
def load_from_str(self, db_str: str) -> MaigretDatabase:
try:
data = json.loads(db_str)
@@ -274,7 +270,6 @@ class MaigretDatabase:
return self.load_from_json(data)
def load_from_url(self, url: str) -> MaigretDatabase:
is_url_valid = url.startswith('http://') or url.startswith('https://')
@@ -303,7 +298,6 @@ class MaigretDatabase:
return self.load_from_json(data)
def load_from_file(self, filename: str) -> MaigretDatabase:
try:
with open(filename, 'r', encoding='utf-8') as file:
@@ -364,7 +358,7 @@ class MaigretDatabase:
continue
tags[tag] = tags.get(tag, 0) + 1
output += f'Enabled/total sites: {total_count-disabled_count}/{total_count}\n'
output += f'Enabled/total sites: {total_count - disabled_count}/{total_count}\n'
output += 'Top sites\' profile URLs:\n'
for url, count in sorted(urls.items(), key=lambda x: x[1], reverse=True)[:20]:
if count == 1:
@@ -377,4 +371,4 @@ class MaigretDatabase:
mark = ' (non-standard)'
output += f'{count}\t{tag}{mark}\n'
return output
return output
+1 -2
View File
@@ -1,5 +1,4 @@
import difflib
import json
import requests
from mock import Mock
@@ -89,7 +88,7 @@ async def submit_dialog(db, url_exists, cookie_file):
domain_raw = URL_RE.sub('', url_exists).strip().strip('/')
domain_raw = domain_raw.split('/')[0]
matched_sites = list(filter(lambda x: domain_raw in x.url_main+x.url, db.sites))
matched_sites = list(filter(lambda x: domain_raw in x.url_main + x.url, db.sites))
if matched_sites:
print(f'Sites with domain "{domain_raw}" already exists in the Maigret database!')
status = lambda s: '(disabled)' if s.disabled else ''
+1 -2
View File
@@ -1,5 +1,4 @@
import re
import sys
class CaseConverter:
@@ -55,4 +54,4 @@ class URLMatcher:
url_regexp = url_main_part.replace('{username}', f'({username_regexp})')
regexp_str = self._HTTP_URL_RE_STR.replace('(.+)', url_regexp)
return re.compile(regexp_str)
return re.compile(regexp_str)
+2 -2
View File
@@ -1,11 +1,11 @@
import glob
import logging
import os
import pytest
from _pytest.mark import Mark
from mock import Mock
from maigret.sites import MaigretDatabase, MaigretSite
from maigret.sites import MaigretDatabase
CUR_PATH = os.path.dirname(os.path.realpath(__file__))
JSON_FILE = os.path.join(CUR_PATH, '../maigret/resources/data.json')
+1
View File
@@ -1,5 +1,6 @@
"""Maigret activation test functions"""
import json
import aiohttp
import pytest
from mock import Mock
+2 -1
View File
@@ -1,10 +1,11 @@
"""Maigret main module test functions"""
import asyncio
import pytest
from mock import Mock
from maigret.maigret import self_check
from maigret.sites import MaigretDatabase, MaigretSite
from maigret.sites import MaigretDatabase
EXAMPLE_DB = {
'engines': {
+20 -20
View File
@@ -1,33 +1,32 @@
"""Maigret Database test functions"""
from maigret.sites import MaigretDatabase, MaigretSite
EXAMPLE_DB = {
'engines': {
"XenForo": {
"presenseStrs": ["XenForo"],
"site": {
"absenceStrs": [
"The specified member cannot be found. Please enter a member's entire name.",
],
"checkType": "message",
"errors": {
"You must be logged-in to do that.": "Login required"
},
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
"presenseStrs": ["XenForo"],
"site": {
"absenceStrs": [
"The specified member cannot be found. Please enter a member's entire name.",
],
"checkType": "message",
"errors": {
"You must be logged-in to do that.": "Login required"
},
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
},
},
'sites': {
"Amperka": {
"engine": "XenForo",
"rank": 121613,
"tags": [
"ru"
],
"urlMain": "http://forum.amperka.ru",
"usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7"
"engine": "XenForo",
"rank": 121613,
"tags": [
"ru"
],
"urlMain": "http://forum.amperka.ru",
"usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7"
},
}
}
@@ -167,6 +166,7 @@ def test_ranked_sites_dict_disabled():
assert len(db.ranked_sites_dict()) == 2
assert len(db.ranked_sites_dict(disabled=False)) == 1
def test_ranked_sites_dict_id_type():
db = MaigretDatabase()
db.update_site(MaigretSite('1', {}))
+46 -38
View File
@@ -1,66 +1,74 @@
"""Maigret utils test functions"""
import itertools
import re
from maigret.utils import CaseConverter, is_country_tag, enrich_link_str, URLMatcher
def test_case_convert_camel_to_snake():
a = 'SnakeCasedString'
b = CaseConverter.camel_to_snake(a)
a = 'SnakeCasedString'
b = CaseConverter.camel_to_snake(a)
assert b == 'snake_cased_string'
assert b == 'snake_cased_string'
def test_case_convert_snake_to_camel():
a = 'camel_cased_string'
b = CaseConverter.snake_to_camel(a)
a = 'camel_cased_string'
b = CaseConverter.snake_to_camel(a)
assert b == 'camelCasedString'
assert b == 'camelCasedString'
def test_case_convert_snake_to_title():
a = 'camel_cased_string'
b = CaseConverter.snake_to_title(a)
a = 'camel_cased_string'
b = CaseConverter.snake_to_title(a)
assert b == 'Camel cased string'
assert b == 'Camel cased string'
def test_is_country_tag():
assert is_country_tag('ru') == True
assert is_country_tag('FR') == True
assert is_country_tag('ru') == True
assert is_country_tag('FR') == True
assert is_country_tag('a1') == False
assert is_country_tag('dating') == False
assert is_country_tag('a1') == False
assert is_country_tag('dating') == False
assert is_country_tag('global') == True
assert is_country_tag('global') == True
def test_enrich_link_str():
assert enrich_link_str('test') == 'test'
assert enrich_link_str(' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
assert enrich_link_str('test') == 'test'
assert enrich_link_str(
' www.flickr.com/photos/alexaimephotography/') == '<a class="auto-link" href="www.flickr.com/photos/alexaimephotography/">www.flickr.com/photos/alexaimephotography/</a>'
def test_url_extract_main_part():
url_main_part = 'flickr.com/photos/alexaimephotography'
url_main_part = 'flickr.com/photos/alexaimephotography'
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.extract_main_part(url) == url_main_part
assert not url_regexp.match(url) is None
url_regexp = re.compile('^https?://(www.)?flickr.com/photos/(.+?)$')
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.extract_main_part(url) == url_main_part
assert not url_regexp.match(url) is None
def test_url_make_profile_url_regexp():
url_main_part = 'flickr.com/photos/{username}'
url_main_part = 'flickr.com/photos/{username}'
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
parts = [
['http://', 'https://'],
['www.', ''],
[url_main_part],
['/', ''],
]
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.make_profile_url_regexp(url).pattern == r'^https?://(www.)?flickr\.com/photos/(.+?)$'
for url_parts in itertools.product(*parts):
url = ''.join(url_parts)
assert URLMatcher.make_profile_url_regexp(url).pattern == r'^https?://(www.)?flickr\.com/photos/(.+?)$'