mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-07 06:24:35 +00:00
Reports refactoring & tests
This commit is contained in:
+8
-106
@@ -26,6 +26,7 @@ from socid_extractor import parse, extract
|
|||||||
from .notify import QueryNotifyPrint
|
from .notify import QueryNotifyPrint
|
||||||
from .result import QueryResult, QueryStatus
|
from .result import QueryResult, QueryStatus
|
||||||
from .sites import MaigretDatabase, MaigretSite
|
from .sites import MaigretDatabase, MaigretSite
|
||||||
|
from .report import save_csv_report, genxmindfile
|
||||||
|
|
||||||
import xmind
|
import xmind
|
||||||
|
|
||||||
@@ -526,6 +527,7 @@ async def site_self_check(site_name, site_data, logger, no_progressbar=False):
|
|||||||
forced=True,
|
forced=True,
|
||||||
no_progressbar=no_progressbar,
|
no_progressbar=no_progressbar,
|
||||||
)
|
)
|
||||||
|
|
||||||
# don't disable entries with other ids types
|
# don't disable entries with other ids types
|
||||||
if site_name not in results:
|
if site_name not in results:
|
||||||
logger.info(results)
|
logger.info(results)
|
||||||
@@ -592,66 +594,6 @@ async def self_check(json_file, logger):
|
|||||||
data['sites'] = all_sites
|
data['sites'] = all_sites
|
||||||
json.dump(data, f, indent=4)
|
json.dump(data, f, indent=4)
|
||||||
|
|
||||||
def genxmindfile(filename, username,results):
|
|
||||||
print("Generating XMIND8 file")
|
|
||||||
if os.path.exists(filename):
|
|
||||||
os.remove(filename)
|
|
||||||
workbook = xmind.load(filename)
|
|
||||||
sheet = workbook.getPrimarySheet()
|
|
||||||
design_sheet1(sheet, username, results)
|
|
||||||
xmind.save(workbook, path=filename)
|
|
||||||
|
|
||||||
## detect if tag rappresent a nation
|
|
||||||
def checknation(tag):
|
|
||||||
if re.match("^([a-z]){2}$", tag):
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def design_sheet1(sheet, username, results):
|
|
||||||
##all tag list
|
|
||||||
alltags = {}
|
|
||||||
|
|
||||||
sheet.setTitle("%s Analysis"%(username))
|
|
||||||
root_topic1 = sheet.getRootTopic()
|
|
||||||
root_topic1.setTitle("%s"%(username))
|
|
||||||
|
|
||||||
undefinedsection = root_topic1.addSubTopic()
|
|
||||||
undefinedsection.setTitle("Undefined")
|
|
||||||
alltags["undefined"] = undefinedsection
|
|
||||||
|
|
||||||
for website_name in results:
|
|
||||||
dictionary = results[website_name]
|
|
||||||
|
|
||||||
if dictionary.get("status").status == QueryStatus.CLAIMED:
|
|
||||||
## firsttime I found that entry
|
|
||||||
for tag in dictionary.get("status").tags.split(","):
|
|
||||||
if(tag.strip() == ""):
|
|
||||||
continue
|
|
||||||
if( tag not in alltags.keys()):
|
|
||||||
if (not checknation(tag)):
|
|
||||||
tagsection = root_topic1.addSubTopic()
|
|
||||||
tagsection.setTitle(tag)
|
|
||||||
alltags[tag] = tagsection
|
|
||||||
|
|
||||||
category = None
|
|
||||||
userlink= None
|
|
||||||
for tag in dictionary.get("status").tags.split(","):
|
|
||||||
if(tag.strip() == ""):
|
|
||||||
continue
|
|
||||||
if(not checknation(tag)):
|
|
||||||
category = tag
|
|
||||||
|
|
||||||
if(category is None):
|
|
||||||
category = "undefined"
|
|
||||||
userlink = undefinedsection.addSubTopic()
|
|
||||||
else:
|
|
||||||
userlink = alltags[category].addSubTopic()
|
|
||||||
userlink.addLabel(dictionary.get("status").site_url_user)
|
|
||||||
|
|
||||||
#for tag in dictionary.get("status").tags.split(","):
|
|
||||||
# if( tag != category ):
|
|
||||||
# sheet.createRelationship(userlink.getID(), alltags[tag].getID(),"other tag")
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
version_string = f"%(prog)s {__version__}\n" + \
|
version_string = f"%(prog)s {__version__}\n" + \
|
||||||
@@ -680,12 +622,9 @@ async def main():
|
|||||||
parser.add_argument("--rank", "-r",
|
parser.add_argument("--rank", "-r",
|
||||||
action="store_true", dest="rank", default=False,
|
action="store_true", dest="rank", default=False,
|
||||||
help="Present websites ordered by their Alexa.com global rank in popularity.")
|
help="Present websites ordered by their Alexa.com global rank in popularity.")
|
||||||
parser.add_argument("--folderoutput", "-fo", dest="folderoutput",
|
parser.add_argument("--folderoutput", "-fo", dest="folderoutput", default="reports",
|
||||||
help="If using multiple usernames, the output of the results will be saved to this folder."
|
help="If using multiple usernames, the output of the results will be saved to this folder."
|
||||||
)
|
)
|
||||||
parser.add_argument("--output", "-o", dest="output",
|
|
||||||
help="If using single username, the output of the result will be saved to this file."
|
|
||||||
)
|
|
||||||
parser.add_argument("--csv",
|
parser.add_argument("--csv",
|
||||||
action="store_true", dest="csv", default=False,
|
action="store_true", dest="csv", default=False,
|
||||||
help="Create Comma-Separated Values (CSV) File."
|
help="Create Comma-Separated Values (CSV) File."
|
||||||
@@ -791,16 +730,6 @@ async def main():
|
|||||||
if args.proxy is not None:
|
if args.proxy is not None:
|
||||||
print("Using the proxy: " + args.proxy)
|
print("Using the proxy: " + args.proxy)
|
||||||
|
|
||||||
# Check if both output methods are entered as input.
|
|
||||||
if args.output is not None and args.folderoutput is not None:
|
|
||||||
print("You can only use one of the output methods.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Check validity for single username output.
|
|
||||||
if args.output is not None and len(args.username) != 1:
|
|
||||||
print("You can only use --output with a single username")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if args.parse_url:
|
if args.parse_url:
|
||||||
page, _ = parse(args.parse_url, cookies_str='')
|
page, _ = parse(args.parse_url, cookies_str='')
|
||||||
info = extract(page)
|
info = extract(page)
|
||||||
@@ -906,23 +835,19 @@ async def main():
|
|||||||
forced=args.use_disabled_sites,
|
forced=args.use_disabled_sites,
|
||||||
)
|
)
|
||||||
|
|
||||||
if args.output:
|
if args.folderoutput:
|
||||||
result_file = args.output
|
|
||||||
if (args.xmind):
|
|
||||||
xmind_path = f"{username}.xmind"
|
|
||||||
elif args.folderoutput:
|
|
||||||
# The usernames results should be stored in a targeted folder.
|
# The usernames results should be stored in a targeted folder.
|
||||||
# If the folder doesn't exist, create it first
|
# If the folder doesn't exist, create it first
|
||||||
os.makedirs(args.folderoutput, exist_ok=True)
|
os.makedirs(args.folderoutput, exist_ok=True)
|
||||||
result_file = os.path.join(args.folderoutput, f"{username}.txt")
|
result_file = os.path.join(args.folderoutput, f"{username}.txt")
|
||||||
if (args.xmind):
|
if args.xmind:
|
||||||
xmind_path = os.path.join(args.folderoutput, f"{username}.xmind")
|
xmind_path = os.path.join(args.folderoutput, f"{username}.xmind")
|
||||||
else:
|
else:
|
||||||
result_file = f"{username}.txt"
|
result_file = f"{username}.txt"
|
||||||
if (args.xmind):
|
if args.xmind:
|
||||||
xmind_path = f"{username}.xmind"
|
xmind_path = f"{username}.xmind"
|
||||||
|
|
||||||
if (args.xmind):
|
if args.xmind:
|
||||||
genxmindfile(xmind_path, username, results)
|
genxmindfile(xmind_path, username, results)
|
||||||
|
|
||||||
with open(result_file, "w", encoding="utf-8") as file:
|
with open(result_file, "w", encoding="utf-8") as file:
|
||||||
@@ -943,30 +868,7 @@ async def main():
|
|||||||
file.write(f"Total Websites Username Detected On : {exists_counter}")
|
file.write(f"Total Websites Username Detected On : {exists_counter}")
|
||||||
|
|
||||||
if args.csv:
|
if args.csv:
|
||||||
with open(username + ".csv", "w", newline='', encoding="utf-8") as csv_report:
|
save_csv_report(username, results)
|
||||||
writer = csv.writer(csv_report)
|
|
||||||
writer.writerow(['username',
|
|
||||||
'name',
|
|
||||||
'url_main',
|
|
||||||
'url_user',
|
|
||||||
'exists',
|
|
||||||
'http_status',
|
|
||||||
'response_time_s'
|
|
||||||
]
|
|
||||||
)
|
|
||||||
for site in results:
|
|
||||||
response_time_s = results[site]['status'].query_time
|
|
||||||
if response_time_s is None:
|
|
||||||
response_time_s = ""
|
|
||||||
writer.writerow([username,
|
|
||||||
site,
|
|
||||||
results[site]['url_main'],
|
|
||||||
results[site]['url_user'],
|
|
||||||
str(results[site]['status'].status),
|
|
||||||
results[site]['http_status'],
|
|
||||||
response_time_s
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
|
|||||||
@@ -18059,13 +18059,6 @@
|
|||||||
"usernameClaimed": "noonewouldeverusethis7",
|
"usernameClaimed": "noonewouldeverusethis7",
|
||||||
"usernameUnclaimed": "alex"
|
"usernameUnclaimed": "alex"
|
||||||
},
|
},
|
||||||
"vaz04.ru": {
|
|
||||||
"engine": "uCoz",
|
|
||||||
"alexaRank": 1821706,
|
|
||||||
"urlMain": "http://vaz04.ru",
|
|
||||||
"usernameClaimed": "noonewouldeverusethis7",
|
|
||||||
"usernameUnclaimed": "alex"
|
|
||||||
},
|
|
||||||
"vdv-belarus.ucoz.com": {
|
"vdv-belarus.ucoz.com": {
|
||||||
"engine": "uCoz",
|
"engine": "uCoz",
|
||||||
"urlMain": "http://vdv-belarus.ucoz.com",
|
"urlMain": "http://vdv-belarus.ucoz.com",
|
||||||
|
|||||||
+1
-1
@@ -7,7 +7,7 @@ import sys
|
|||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from maigret.utils import CaseConverter
|
from .utils import CaseConverter
|
||||||
|
|
||||||
|
|
||||||
class MaigretEngine:
|
class MaigretEngine:
|
||||||
|
|||||||
@@ -11,3 +11,8 @@ class CaseConverter:
|
|||||||
formatted = ''.join(word.title() for word in snakecased_string.split('_'))
|
formatted = ''.join(word.title() for word in snakecased_string.split('_'))
|
||||||
result = formatted[0].lower() + formatted[1:]
|
result = formatted[0].lower() + formatted[1:]
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def is_country_tag(tag):
|
||||||
|
"""detect if tag represent a country"""
|
||||||
|
return bool(re.match("^([a-z]){2}$", tag))
|
||||||
|
|||||||
+7
-1
@@ -1,5 +1,5 @@
|
|||||||
"""Maigret utils test functions"""
|
"""Maigret utils test functions"""
|
||||||
from maigret.utils import CaseConverter
|
from maigret.utils import CaseConverter, is_country_tag
|
||||||
|
|
||||||
|
|
||||||
def test_case_convert_camel_to_snake():
|
def test_case_convert_camel_to_snake():
|
||||||
@@ -13,3 +13,9 @@ def test_case_convert_snake_to_camel():
|
|||||||
b = CaseConverter.snake_to_camel(a)
|
b = CaseConverter.snake_to_camel(a)
|
||||||
|
|
||||||
assert b == 'camelCasedString'
|
assert b == 'camelCasedString'
|
||||||
|
|
||||||
|
def test_is_country_tag():
|
||||||
|
assert is_country_tag('ru') == True
|
||||||
|
|
||||||
|
assert is_country_tag('a1') == False
|
||||||
|
assert is_country_tag('dating') == False
|
||||||
|
|||||||
Reference in New Issue
Block a user