mirror of
https://github.com/soxoj/maigret.git
synced 2026-05-07 14:34:33 +00:00
Refactoring: updated data & sites storing, tests added
This commit is contained in:
+125
-94
@@ -1,96 +1,118 @@
|
||||
"""Maigret Sites Information"""
|
||||
from __future__ import annotations
|
||||
import copy
|
||||
import json
|
||||
import operator
|
||||
import sys
|
||||
|
||||
import requests
|
||||
|
||||
from maigret.utils import CaseConverter
|
||||
|
||||
|
||||
class MaigretEngine:
|
||||
def __init__(self, name, *args, **kwargs):
|
||||
def __init__(self, name, data):
|
||||
self.name = name
|
||||
self.__dict__.update(kwargs)
|
||||
self.__dict__.update(data)
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
return self.__dict__
|
||||
|
||||
|
||||
class MaigretSite:
|
||||
def __init__(self, name, url_main, url_username_format, popularity_rank,
|
||||
username_claimed, username_unclaimed,
|
||||
information):
|
||||
"""Create Site Information Object.
|
||||
|
||||
Contains information about a specific web site.
|
||||
|
||||
Keyword Arguments:
|
||||
self -- This object.
|
||||
name -- String which identifies site.
|
||||
url_main -- String containing URL for home of site.
|
||||
url_username_format -- String containing URL for Username format
|
||||
on site.
|
||||
NOTE: The string should contain the
|
||||
token "{}" where the username should
|
||||
be substituted. For example, a string
|
||||
of "https://somesite.com/users/{}"
|
||||
indicates that the individual
|
||||
usernames would show up under the
|
||||
"https://somesite.com/users/" area of
|
||||
the web site.
|
||||
popularity_rank -- Integer indicating popularity of site.
|
||||
In general, smaller numbers mean more
|
||||
popular ("0" or None means ranking
|
||||
information not available).
|
||||
username_claimed -- String containing username which is known
|
||||
to be claimed on web site.
|
||||
username_unclaimed -- String containing username which is known
|
||||
to be unclaimed on web site.
|
||||
information -- Dictionary containing all known information
|
||||
about web site.
|
||||
NOTE: Custom information about how to
|
||||
actually detect the existence of the
|
||||
username will be included in this
|
||||
dictionary. This information will
|
||||
be needed by the detection method,
|
||||
but it is only recorded in this
|
||||
object for future use.
|
||||
|
||||
Return Value:
|
||||
Nothing.
|
||||
"""
|
||||
|
||||
def __init__(self, name, information):
|
||||
self.name = name
|
||||
self.url_main = url_main
|
||||
self.url_username_format = url_username_format
|
||||
|
||||
if (popularity_rank is None) or (popularity_rank == 0):
|
||||
# We do not know the popularity, so make site go to bottom of list.
|
||||
popularity_rank = sys.maxsize
|
||||
self.popularity_rank = popularity_rank
|
||||
self.disabled = False
|
||||
self.similar_search = False
|
||||
self.ignore_403 = False
|
||||
self.tags = []
|
||||
|
||||
self.username_claimed = username_claimed
|
||||
self.username_unclaimed = username_unclaimed
|
||||
self.information = information
|
||||
self.disabled = information.get('disabled', False)
|
||||
self.similar_search = information.get('similarSearch', False)
|
||||
self.ignore_403 = information.get('ignore_403', False)
|
||||
self.tags = information.get('tags', [])
|
||||
self.type = 'username'
|
||||
self.headers = {}
|
||||
self.errors = {}
|
||||
self.url_subpath = ''
|
||||
self.regex_check = None
|
||||
self.url_probe = None
|
||||
self.check_type = ''
|
||||
self.request_head_only = ''
|
||||
|
||||
self.type = information.get('type', 'username')
|
||||
self.headers = information.get('headers', {})
|
||||
self.errors = information.get('errors', {})
|
||||
self.url_subpath = information.get('urlSubpath', '')
|
||||
self.regex_check = information.get('regexCheck', None)
|
||||
self.url_probe = information.get('urlProbe', None)
|
||||
self.check_type = information.get('errorType', '')
|
||||
self.request_head_only = information.get('request_head_only', '')
|
||||
self.presense_strs = []
|
||||
self.absence_strs = []
|
||||
|
||||
self.presense_strs = information.get('presenseStrs', [])
|
||||
self.absence_strs = information.get('errorMsg', [])
|
||||
self.engine = None
|
||||
self.engine_data = {}
|
||||
self.engine_obj = None
|
||||
self.request_future = None
|
||||
self.alexa_rank = None
|
||||
|
||||
for k, v in information.items():
|
||||
self.__dict__[CaseConverter.camel_to_snake(k)] = v
|
||||
|
||||
if (self.alexa_rank is None) or (self.alexa_rank == 0):
|
||||
# We do not know the popularity, so make site go to bottom of list.
|
||||
self.alexa_rank = sys.maxsize
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name} ({self.url_main})"
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
result = {}
|
||||
for k, v in self.__dict__.items():
|
||||
# convert to camelCase
|
||||
field = CaseConverter.snake_to_camel(k)
|
||||
# strip empty elements
|
||||
if v in (False, '', [], {}, None, sys.maxsize, 'username'):
|
||||
continue
|
||||
if field in ['name', 'engineData', 'requestFuture', 'detectedEngine', 'engineObj']:
|
||||
continue
|
||||
result[field] = v
|
||||
|
||||
return result
|
||||
|
||||
def update(self, updates: dict) -> MaigretSite:
|
||||
self.__dict__.update(updates)
|
||||
|
||||
return self
|
||||
|
||||
def update_from_engine(self, engine: MaigretEngine) -> MaigretSite:
|
||||
engine_data = engine.site
|
||||
for k, v in engine_data.items():
|
||||
field = CaseConverter.camel_to_snake(k)
|
||||
if isinstance(v, dict):
|
||||
# TODO: assertion of intersecting keys
|
||||
# update dicts like errors
|
||||
self.__dict__.get(field, {}).update(v)
|
||||
else:
|
||||
self.__dict__[field] = v
|
||||
|
||||
self.engine_obj = engine
|
||||
|
||||
return self
|
||||
|
||||
def strip_engine_data(self) -> MaigretSite:
|
||||
if not self.engine_obj:
|
||||
return self
|
||||
|
||||
self.request_future = None
|
||||
self_copy = copy.deepcopy(self)
|
||||
engine_data = self_copy.engine_obj.site
|
||||
for field in engine_data.keys():
|
||||
if isinstance(engine_data[field], dict):
|
||||
for k in engine_data[field].keys():
|
||||
del self_copy.__dict__[field][k]
|
||||
continue
|
||||
|
||||
if field in list(self_copy.__dict__.keys()):
|
||||
del self_copy.__dict__[field]
|
||||
if CaseConverter.camel_to_snake(field) in list(self_copy.__dict__.keys()):
|
||||
del self_copy.__dict__[CaseConverter.camel_to_snake(field)]
|
||||
|
||||
return self_copy
|
||||
|
||||
|
||||
class MaigretDatabase:
|
||||
def __init__(self):
|
||||
@@ -98,20 +120,43 @@ class MaigretDatabase:
|
||||
self._engines = []
|
||||
|
||||
@property
|
||||
def sites(self: MaigretDatabase):
|
||||
def sites(self):
|
||||
return self._sites
|
||||
|
||||
@property
|
||||
def sites_dict(self):
|
||||
return {site.name: site for site in self._sites}
|
||||
|
||||
|
||||
@property
|
||||
def engines(self: MaigretDatabase):
|
||||
def engines(self):
|
||||
return self._engines
|
||||
|
||||
@property
|
||||
def engines_dict(self):
|
||||
return {engine.name: engine for engine in self._engines}
|
||||
|
||||
def load_from_json(self: MaigretDatabase, json_data: dict) -> MaigretDatabase:
|
||||
def update_site(self, site: MaigretSite) -> MaigretDatabase:
|
||||
for s in self._sites:
|
||||
if s.name == site.name:
|
||||
s = site
|
||||
|
||||
return self
|
||||
|
||||
def save_to_file(self, filename: str) -> MaigretDatabase:
|
||||
json_data = {
|
||||
'sites': {site.name: site.strip_engine_data().json for site in self._sites},
|
||||
'engines': {engine.name: engine.json for engine in self._engines},
|
||||
}
|
||||
|
||||
json_data = json.dumps(json_data, indent=4)
|
||||
|
||||
with open(filename, 'w') as f:
|
||||
f.write(json_data)
|
||||
|
||||
return self
|
||||
|
||||
|
||||
def load_from_json(self, json_data: dict) -> MaigretDatabase:
|
||||
# Add all of site information from the json file to internal site list.
|
||||
site_data = json_data.get("sites")
|
||||
engines_data = json_data.get("engines")
|
||||
@@ -121,25 +166,11 @@ class MaigretDatabase:
|
||||
|
||||
for site_name in site_data:
|
||||
try:
|
||||
site = {}
|
||||
site_user_info = site_data[site_name]
|
||||
# If popularity unknown, make site be at bottom of list.
|
||||
popularity_rank = site_user_info.get("rank", sys.maxsize)
|
||||
maigret_site = MaigretSite(site_name, site_data[site_name])
|
||||
|
||||
if 'engine' in site_user_info:
|
||||
engine_info = engines_data[site_user_info['engine']]['site']
|
||||
site.update(engine_info)
|
||||
|
||||
site.update(site_user_info)
|
||||
|
||||
maigret_site = MaigretSite(site_name,
|
||||
site["urlMain"],
|
||||
site["url"],
|
||||
popularity_rank,
|
||||
site["username_claimed"],
|
||||
site["username_unclaimed"],
|
||||
site
|
||||
)
|
||||
engine = site_data[site_name].get('engine')
|
||||
if engine:
|
||||
maigret_site.update_from_engine(self.engines_dict[engine])
|
||||
|
||||
self._sites.append(maigret_site)
|
||||
except KeyError as error:
|
||||
@@ -150,7 +181,7 @@ class MaigretDatabase:
|
||||
return self
|
||||
|
||||
|
||||
def load_from_str(self: MaigretDatabase, db_str: str) -> MaigretDatabase:
|
||||
def load_from_str(self, db_str: str) -> MaigretDatabase:
|
||||
try:
|
||||
data = json.loads(db_str)
|
||||
except Exception as error:
|
||||
@@ -161,7 +192,7 @@ class MaigretDatabase:
|
||||
return self.load_from_json(data)
|
||||
|
||||
|
||||
def load_from_url(self: MaigretDatabase, url: str) -> MaigretDatabase:
|
||||
def load_from_url(self, url: str) -> MaigretDatabase:
|
||||
is_url_valid = url.startswith('http://') or url.startswith('https://')
|
||||
|
||||
if not is_url_valid:
|
||||
@@ -190,7 +221,7 @@ class MaigretDatabase:
|
||||
return self.load_from_json(data)
|
||||
|
||||
|
||||
def load_from_file(self: MaigretDatabase, filename: str) -> MaigretDatabase:
|
||||
def load_from_file(self, filename: str) -> MaigretDatabase:
|
||||
try:
|
||||
with open(filename, 'r', encoding='utf-8') as file:
|
||||
try:
|
||||
@@ -207,7 +238,7 @@ class MaigretDatabase:
|
||||
return self.load_from_json(data)
|
||||
|
||||
|
||||
def site_name_list(self: MaigretDatabase, popularity_rank=False):
|
||||
def site_name_list(self, popularity_rank=False):
|
||||
"""Get Site Name List.
|
||||
|
||||
Keyword Arguments:
|
||||
|
||||
Reference in New Issue
Block a user