mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-06 22:19:01 +00:00
- Added XMind support
- Added export tag in result
This commit is contained in:
+94
-7
@@ -26,6 +26,7 @@ from .notify import QueryNotifyPrint
|
||||
from .result import QueryResult, QueryStatus
|
||||
from .sites import SitesInformation
|
||||
|
||||
import xmind
|
||||
|
||||
__version__ = '0.1.7'
|
||||
|
||||
@@ -139,6 +140,10 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
|
||||
if not response:
|
||||
return results_info
|
||||
|
||||
fulltags = []
|
||||
if ("tags" in net_info.keys()):
|
||||
fulltags = net_info["tags"]
|
||||
|
||||
# Retrieve other site information again
|
||||
username = results_info['username']
|
||||
is_parsing_enabled = results_info['parsing_enabled']
|
||||
@@ -191,7 +196,7 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
|
||||
url,
|
||||
QueryStatus.UNKNOWN,
|
||||
query_time=response_time,
|
||||
context=f'{error_text}: {site_error_text}')
|
||||
context=f'{error_text}: {site_error_text}', tags=fulltags)
|
||||
elif error_type == "message":
|
||||
absence_flags = net_info.get("errorMsg")
|
||||
is_absence_flags_list = isinstance(absence_flags, list)
|
||||
@@ -203,13 +208,13 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
|
||||
social_network,
|
||||
url,
|
||||
QueryStatus.CLAIMED,
|
||||
query_time=response_time)
|
||||
query_time=response_time, tags=fulltags)
|
||||
else:
|
||||
result = QueryResult(username,
|
||||
social_network,
|
||||
url,
|
||||
QueryStatus.AVAILABLE,
|
||||
query_time=response_time)
|
||||
query_time=response_time, tags=fulltags)
|
||||
elif error_type == "status_code":
|
||||
# Checks if the status code of the response is 2XX
|
||||
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
|
||||
@@ -217,13 +222,13 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
|
||||
social_network,
|
||||
url,
|
||||
QueryStatus.CLAIMED,
|
||||
query_time=response_time)
|
||||
query_time=response_time, tags=fulltags)
|
||||
else:
|
||||
result = QueryResult(username,
|
||||
social_network,
|
||||
url,
|
||||
QueryStatus.AVAILABLE,
|
||||
query_time=response_time)
|
||||
query_time=response_time, tags=fulltags)
|
||||
elif error_type == "response_url":
|
||||
# For this detection method, we have turned off the redirect.
|
||||
# So, there is no need to check the response URL: it will always
|
||||
@@ -235,13 +240,13 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
|
||||
social_network,
|
||||
url,
|
||||
QueryStatus.CLAIMED,
|
||||
query_time=response_time)
|
||||
query_time=response_time, tags=fulltags)
|
||||
else:
|
||||
result = QueryResult(username,
|
||||
social_network,
|
||||
url,
|
||||
QueryStatus.AVAILABLE,
|
||||
query_time=response_time)
|
||||
query_time=response_time, tags=fulltags)
|
||||
else:
|
||||
# It should be impossible to ever get here...
|
||||
raise ValueError(f"Unknown Error Type '{error_type}' for "
|
||||
@@ -340,6 +345,11 @@ async def maigret(username, site_data, query_notify, logger,
|
||||
|
||||
# First create futures for all requests. This allows for the requests to run in parallel
|
||||
for social_network, net_info in site_data.items():
|
||||
|
||||
fulltags = []
|
||||
if ("tags" in net_info.keys()):
|
||||
fulltags = net_info["tags"]
|
||||
|
||||
if net_info.get('type', 'username') != id_type:
|
||||
continue
|
||||
|
||||
@@ -359,6 +369,7 @@ async def maigret(username, site_data, query_notify, logger,
|
||||
results_site['parsing_enabled'] = recursive_search
|
||||
results_site['url_main'] = net_info.get("urlMain")
|
||||
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
|
||||
}
|
||||
@@ -574,6 +585,66 @@ async def self_check(json_file, logger):
|
||||
data['sites'] = all_sites
|
||||
json.dump(data, f, indent=4)
|
||||
|
||||
def genxmindfile(filename, username,results):
|
||||
print("Generating XMIND8 file")
|
||||
if os.path.exists(filename):
|
||||
os.remove(filename)
|
||||
workbook = xmind.load(filename)
|
||||
sheet = workbook.getPrimarySheet()
|
||||
design_sheet1(sheet, username, results)
|
||||
xmind.save(workbook, path=filename)
|
||||
|
||||
## detect if tag rappresent a nation
|
||||
def checknation(tag):
|
||||
if re.match("^([a-z]){2}$", tag):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def design_sheet1(sheet, username, results):
|
||||
##all tag list
|
||||
alltags = {}
|
||||
|
||||
sheet.setTitle("%s Analysis"%(username))
|
||||
root_topic1 = sheet.getRootTopic()
|
||||
root_topic1.setTitle("%s"%(username))
|
||||
|
||||
undefinedsection = root_topic1.addSubTopic()
|
||||
undefinedsection.setTitle("Undefined")
|
||||
alltags["undefined"] = undefinedsection
|
||||
|
||||
for website_name in results:
|
||||
dictionary = results[website_name]
|
||||
|
||||
if dictionary.get("status").status == QueryStatus.CLAIMED:
|
||||
## firsttime I found that entry
|
||||
for tag in dictionary.get("status").tags.split(","):
|
||||
if(tag.strip() == ""):
|
||||
continue
|
||||
if( tag not in alltags.keys()):
|
||||
if (not checknation(tag)):
|
||||
tagsection = root_topic1.addSubTopic()
|
||||
tagsection.setTitle(tag)
|
||||
alltags[tag] = tagsection
|
||||
|
||||
category = None
|
||||
userlink= None
|
||||
for tag in dictionary.get("status").tags.split(","):
|
||||
if(tag.strip() == ""):
|
||||
continue
|
||||
if(not checknation(tag)):
|
||||
category = tag
|
||||
|
||||
if(category is None):
|
||||
category = "undefined"
|
||||
userlink = undefinedsection.addSubTopic()
|
||||
else:
|
||||
userlink = alltags[category].addSubTopic()
|
||||
userlink.addLabel(dictionary.get("status").site_url_user)
|
||||
|
||||
#for tag in dictionary.get("status").tags.split(","):
|
||||
# if( tag != category ):
|
||||
# sheet.createRelationship(userlink.getID(), alltags[tag].getID(),"other tag")
|
||||
|
||||
async def main():
|
||||
version_string = f"%(prog)s {__version__}\n" + \
|
||||
@@ -673,6 +744,13 @@ async def main():
|
||||
dest="tags", default='',
|
||||
help="Specify tags of sites."
|
||||
)
|
||||
|
||||
parser.add_argument("-x","--xmind",
|
||||
action="store_true",
|
||||
dest="xmind", default=False,
|
||||
help="Generate an xmind 8 mindmap"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Logging
|
||||
@@ -830,13 +908,22 @@ async def main():
|
||||
|
||||
if args.output:
|
||||
result_file = args.output
|
||||
if (args.xmind):
|
||||
xmind_path = f"{username}.xmind"
|
||||
elif args.folderoutput:
|
||||
# The usernames results should be stored in a targeted folder.
|
||||
# If the folder doesn't exist, create it first
|
||||
os.makedirs(args.folderoutput, exist_ok=True)
|
||||
result_file = os.path.join(args.folderoutput, f"{username}.txt")
|
||||
if (args.xmind):
|
||||
xmind_path = os.path.join(args.folderoutput, f"{username}.xmind")
|
||||
else:
|
||||
result_file = f"{username}.txt"
|
||||
if (args.xmind):
|
||||
xmind_path = f"{username}.xmind"
|
||||
|
||||
if (args.xmind):
|
||||
genxmindfile(xmind_path, username, results)
|
||||
|
||||
with open(result_file, "w", encoding="utf-8") as file:
|
||||
exists_counter = 0
|
||||
|
||||
+7
-1
@@ -34,7 +34,7 @@ class QueryResult():
|
||||
"""
|
||||
|
||||
def __init__(self, username, site_name, site_url_user, status, ids_data=None,
|
||||
query_time=None, context=None):
|
||||
query_time=None, context=None, tags=None):
|
||||
"""Create Query Result Object.
|
||||
|
||||
Contains information about a specific method of detecting usernames on
|
||||
@@ -73,6 +73,12 @@ class QueryResult():
|
||||
self.context = context
|
||||
self.ids_data = ids_data
|
||||
|
||||
self.tags = ""
|
||||
if (tags is not None):
|
||||
TAGstring = "".join(['%s,' % tags for tags in tags])
|
||||
TAGstring = TAGstring[:-1]
|
||||
self.tags = TAGstring
|
||||
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
|
||||
Reference in New Issue
Block a user