Refactoring, sites database tests

This commit is contained in:
Soxoj
2021-01-02 00:23:58 +03:00
parent 863e16b1d9
commit 315ecec39f
5 changed files with 246 additions and 221 deletions
+56 -69
View File
@@ -25,7 +25,7 @@ from socid_extractor import parse, extract
from .notify import QueryNotifyPrint
from .result import QueryResult, QueryStatus
from .sites import SitesInformation
from .sites import MaigretDatabase, MaigretSite
import xmind
@@ -55,7 +55,7 @@ unsupported_characters = '#'
cookies_file = 'cookies.txt'
async def get_response(request_future, social_network, logger):
async def get_response(request_future, site_name, logger):
html_text = None
status_code = 0
@@ -92,7 +92,7 @@ async def get_response(request_future, social_network, logger):
error_text = "Proxy Error"
expection_text = str(err)
except Exception as err:
logger.warning(f'Unhandled error while requesting {social_network}: {err}')
logger.warning(f'Unhandled error while requesting {site_name}: {err}')
logger.debug(err, exc_info=True)
error_text = "Some Error"
expection_text = str(err)
@@ -101,19 +101,19 @@ async def get_response(request_future, social_network, logger):
return html_text, status_code, error_text, expection_text
async def update_site_data_from_response(sitename, site_data, results_info, semaphore, logger, query_notify):
async def update_site_dict_from_response(sitename, site_dict, results_info, semaphore, logger, query_notify):
async with semaphore:
site_obj = site_data[sitename]
future = site_obj.get('request_future')
site_obj = site_dict[sitename]
future = site_obj.request_future
if not future:
# ignore: search by incompatible id type
return
response = await get_response(request_future=future,
social_network=sitename,
site_name=sitename,
logger=logger)
site_data[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj, sitename)
site_dict[sitename] = process_site_result(response, query_notify, logger, results_info, site_obj)
# TODO: move info separate module
@@ -137,13 +137,11 @@ def detect_error_page(html_text, status_code, fail_flags, ignore_403):
return None, None
def process_site_result(response, query_notify, logger, results_info, net_info, social_network):
def process_site_result(response, query_notify, logger, results_info, site: MaigretSite):
if not response:
return results_info
fulltags = []
if ("tags" in net_info.keys()):
fulltags = net_info["tags"]
fulltags = site.tags
# Retrieve other site information again
username = results_info['username']
@@ -157,14 +155,14 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
return results_info
# Get the expected error type
error_type = net_info["errorType"]
error_type = site.check_type
# Get the failure messages and comments
failure_errors = net_info.get("errors", {})
failure_errors = site.errors
# TODO: refactor
if not response:
logger.error(f'No response for {social_network}')
logger.error(f'No response for {site.name}')
return results_info
html_text, status_code, error_text, expection_text = response
@@ -182,37 +180,37 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
if status_code and not error_text:
error_text, site_error_text = detect_error_page(html_text, status_code, failure_errors,
'ignore_403' in net_info)
site.ignore_403)
# presense flags
# True by default
presense_flags = net_info.get("presenseStrs", [])
presense_flags = site.presense_strs
is_presense_detected = html_text and all(
[(presense_flag in html_text) for presense_flag in presense_flags]) or not presense_flags
if error_text is not None:
logger.debug(error_text)
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=f'{error_text}: {site_error_text}', tags=fulltags)
elif error_type == "message":
absence_flags = net_info.get("errorMsg")
absence_flags = site.absence_strs
is_absence_flags_list = isinstance(absence_flags, list)
absence_flags_set = set(absence_flags) if is_absence_flags_list else {absence_flags}
# Checks if the error message is in the HTML
is_absence_detected = any([(absence_flag in html_text) for absence_flag in absence_flags_set])
if not is_absence_detected and is_presense_detected:
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
@@ -220,13 +218,13 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
# Checks if the status code of the response is 2XX
if (not status_code >= 300 or status_code < 200) and is_presense_detected:
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
@@ -238,20 +236,20 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.CLAIMED,
query_time=response_time, tags=fulltags)
else:
result = QueryResult(username,
social_network,
site.name,
url,
QueryStatus.AVAILABLE,
query_time=response_time, tags=fulltags)
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Error Type '{error_type}' for "
f"site '{social_network}'")
f"site '{site_name}'")
extracted_ids_data = {}
@@ -259,7 +257,7 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f'Error while parsing {social_network}: {e}', exc_info=True)
logger.warning(f'Error while parsing {site_name}: {e}', exc_info=True)
if extracted_ids_data:
new_usernames = {}
@@ -272,22 +270,21 @@ def process_site_result(response, query_notify, logger, results_info, net_info,
results_info['ids_usernames'] = new_usernames
result.ids_data = extracted_ids_data
is_similar = net_info.get('similarSearch', False)
# Notify caller about results of query.
query_notify.update(result, is_similar)
query_notify.update(result, site.similar_search)
# Save status of request
results_info['status'] = result
# Save results from request
results_info['http_status'] = status_code
results_info['is_similar'] = is_similar
results_info['is_similar'] = site.similar_search
# results_site['response_text'] = html_text
results_info['rank'] = net_info.get('rank', 0)
results_info['rank'] = site.popularity_rank
return results_info
async def maigret(username, site_data, query_notify, logger,
async def maigret(username, site_dict, query_notify, logger,
proxy=None, timeout=None, recursive_search=False,
id_type='username', tags=None, debug=False, forced=False,
max_connections=100):
@@ -298,7 +295,7 @@ async def maigret(username, site_data, query_notify, logger,
Keyword Arguments:
username -- String indicating username that report
should be created against.
site_data -- Dictionary containing all of the site data.
site_dict -- Dictionary containing all of the site data.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
@@ -345,21 +342,19 @@ async def maigret(username, site_data, query_notify, logger,
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for social_network, net_info in site_data.items():
for site_name, site in site_dict.items():
fulltags = []
if ("tags" in net_info.keys()):
fulltags = net_info["tags"]
fulltags = site.tags
if net_info.get('type', 'username') != id_type:
if site.type != id_type:
continue
site_tags = set(net_info.get('tags', []))
site_tags = set(fulltags)
if tags:
if not set(tags).intersection(site_tags):
continue
if 'disabled' in net_info and net_info['disabled'] and not forced:
if site.disabled and not forced:
continue
# Results from analysis of this specific site
@@ -368,32 +363,29 @@ async def maigret(username, site_data, query_notify, logger,
# Record URL of main site and username
results_site['username'] = username
results_site['parsing_enabled'] = recursive_search
results_site['url_main'] = net_info.get("urlMain")
results_site['url_main'] = site.url_main
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11.1; rv:55.0) Gecko/20100101 Firefox/55.0',
}
if "headers" in net_info:
# Override/append any extra headers required by a given site.
headers.update(net_info["headers"])
headers.update(site.headers)
# URL of user on site (if it exists)
url = net_info.get('url').format(
urlMain=net_info['urlMain'],
urlSubpath=net_info.get('urlSubpath', ''),
url = site.url_username_format.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username
)
# workaround to prevent slash errors
url = url.replace('///', '/')
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
if site.regex_check and re.search(site.regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
results_site['status'] = QueryResult(username,
social_network,
site_name,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
@@ -403,7 +395,7 @@ async def maigret(username, site_data, query_notify, logger,
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = net_info.get("urlProbe")
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
@@ -411,13 +403,13 @@ async def maigret(username, site_data, query_notify, logger,
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=net_info['urlMain'],
urlSubpath=net_info.get('urlSubpath', ''),
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
if net_info["errorType"] == 'status_code' and net_info.get("request_head_only", True):
if site.check_type == 'status_code' and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
@@ -428,7 +420,7 @@ async def maigret(username, site_data, query_notify, logger,
# not respond properly unless we request the whole page.
request_method = session.get
if net_info["errorType"] == "response_url":
if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
@@ -454,10 +446,11 @@ async def maigret(username, site_data, query_notify, logger,
)
# Store future in data for access later
net_info["request_future"] = future
# TODO: move to separate obj
site.request_future = future
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
results_total[site_name] = results_site
# TODO: move into top-level function
@@ -465,7 +458,7 @@ async def maigret(username, site_data, query_notify, logger,
tasks = []
for sitename, result_obj in results_total.items():
update_site_coro = update_site_data_from_response(sitename, site_data, result_obj, sem, logger, query_notify)
update_site_coro = update_site_dict_from_response(sitename, site_dict, result_obj, sem, logger, query_notify)
future = asyncio.ensure_future(update_site_coro)
tasks.append(future)
@@ -553,8 +546,9 @@ async def site_self_check(site_name, site_data, logger):
async def self_check(json_file, logger):
data = json.load(open(json_file))
sites = SitesInformation(json_file)
db = MaigretDatabase()
db.load_from_file(json_file)
sites = db.sites
all_sites = {}
def disabled_count(data):
@@ -825,18 +819,11 @@ async def main():
# Create object with all information about sites we are aware of.
try:
sites = SitesInformation(args.json_file)
site_data_all = MaigretDatabase().load_from_file(args.json_file).sites_dict
except Exception as error:
print(f"ERROR: {error}")
sys.exit(1)
# Create original dictionary from SitesInformation() object.
# Eventually, the rest of the code will be updated to use the new object
# directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
if args.site_list is None:
# Not desired to look at a sub-set of sites
site_data = site_data_all
@@ -868,7 +855,7 @@ async def main():
site_data[site] = site_dataCpy.get(site)
# Database consistency
enabled_count = len(list(filter(lambda x: not x.get('disabled', False), site_data.values())))
enabled_count = len(list(filter(lambda x: not x.disabled, site_data.values())))
print(f'Sites in database, enabled/total: {enabled_count}/{len(site_data)}')
# Create notify object for query results.
+26 -25
View File
@@ -1,6 +1,7 @@
{
"engines": {
"XenForo": {
"presenseStrs": ["XenForo"],
"site": {
"errorMsg": [
"The specified member cannot be found. Please enter a member's entire name.",
@@ -11,7 +12,7 @@
"errors": {
"You must be logged-in to do that.": "Login required"
},
"url": "{urlMain}/members/?username={username}"
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
},
"phpBB": {
@@ -46,7 +47,7 @@
"The administrator has banned your IP address": "IP ban",
"\u0418\u0437\u0432\u0438\u043d\u0438\u0442\u0435, \u0441\u0435\u0440\u0432\u0435\u0440 \u043f\u0435\u0440\u0435\u0433\u0440\u0443\u0436\u0435\u043d. \u041f\u043e\u0436\u0430\u043b\u0443\u0439\u0441\u0442\u0430, \u043f\u043e\u043f\u0440\u043e\u0431\u0443\u0439\u0442\u0435 \u0437\u0430\u0439\u0442\u0438 \u043f\u043e\u0437\u0436\u0435.": "Server is overloaded"
},
"url": "{urlMain}/{urlSubpath}/member.php?username={username}"
"url": "{urlMain}{urlSubpath}/member.php?username={username}"
}
}
},
@@ -6845,7 +6846,7 @@
"ru"
],
"urlMain": "https://www.infrance.su/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -7987,7 +7988,7 @@
"ru"
],
"urlMain": "https://la.mail.ru",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "wizard",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -8554,7 +8555,7 @@
"ru"
],
"urlMain": "https://minecraftonly.ru",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -8639,7 +8640,7 @@
"us"
],
"urlMain": "https://www.mobile-files.com/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -8819,7 +8820,7 @@
"pk"
],
"urlMain": "https://www.movie-list.com",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -8859,7 +8860,7 @@
"us"
],
"urlMain": "https://www.mpgh.net/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -9873,7 +9874,7 @@
"engine": "vBulletin",
"rank": 4840375,
"urlMain": "http://p38forum.com",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "red",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -10193,7 +10194,7 @@
"ru"
],
"urlMain": "https://pw.mail.ru/",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "wizard",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -10216,7 +10217,7 @@
"ru"
],
"urlMain": "http://pesiq.ru/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -11233,7 +11234,7 @@
"music"
],
"urlMain": "http://www.rap-royalty.com",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "red",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -11365,7 +11366,7 @@
"ru"
],
"urlMain": "http://www.redorchestra.ru",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -11484,7 +11485,7 @@
"ru"
],
"urlMain": "https://rev.mail.ru",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "wizard",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -11523,7 +11524,7 @@
"ru"
],
"urlMain": "https://www.rlocman.ru",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "elnat",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -11700,7 +11701,7 @@
"us"
],
"urlMain": "https://www.rpgwatch.com",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "blue",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -11821,7 +11822,7 @@
"ru"
],
"urlMain": "http://www.russian.fi/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -12532,7 +12533,7 @@
"ru"
],
"urlMain": "https://solaris-club.net",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -12802,7 +12803,7 @@
"ru"
],
"urlMain": "http://statistika.ru",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "hamam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -12900,7 +12901,7 @@
"ru"
],
"urlMain": "https://www.stratege.ru",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "blue",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -13199,7 +13200,7 @@
"ru"
],
"urlMain": "https://tanks.mail.ru",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "red",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -13751,7 +13752,7 @@
"in"
],
"urlMain": "https://www.trainsim.com/",
"urlSubpath": "vbts",
"urlSubpath": "/vbts",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -13986,7 +13987,7 @@
"ru"
],
"urlMain": "http://tv-games.ru/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -14632,7 +14633,7 @@
"ru"
],
"urlMain": "https://wf.mail.ru",
"urlSubpath": "forums",
"urlSubpath": "/forums",
"username_claimed": "wizard",
"username_unclaimed": "noonewouldeverusethis7"
},
@@ -14961,7 +14962,7 @@
"us"
],
"urlMain": "http://wirednewyork.com/",
"urlSubpath": "forum",
"urlSubpath": "/forum",
"username_claimed": "blue",
"username_unclaimed": "noonewouldeverusethis7"
},
+114 -126
View File
@@ -1,8 +1,5 @@
"""Sherlock Sites Information Module
This module supports storing information about web sites.
This is the raw data that will be used to search for usernames.
"""
"""Maigret Sites Information"""
from __future__ import annotations
import json
import operator
import sys
@@ -10,8 +7,14 @@ import sys
import requests
class SiteInformation():
def __init__(self, name, url_home, url_username_format, popularity_rank,
class MaigretEngine:
def __init__(self, name, *args, **kwargs):
self.name = name
self.__dict__.update(kwargs)
class MaigretSite:
def __init__(self, name, url_main, url_username_format, popularity_rank,
username_claimed, username_unclaimed,
information):
"""Create Site Information Object.
@@ -21,7 +24,7 @@ class SiteInformation():
Keyword Arguments:
self -- This object.
name -- String which identifies site.
url_home -- String containing URL for home of site.
url_main -- String containing URL for home of site.
url_username_format -- String containing URL for Username format
on site.
NOTE: The string should contain the
@@ -55,7 +58,7 @@ class SiteInformation():
"""
self.name = name
self.url_home = url_home
self.url_main = url_main
self.url_username_format = url_username_format
if (popularity_rank is None) or (popularity_rank == 0):
@@ -66,105 +69,56 @@ class SiteInformation():
self.username_claimed = username_claimed
self.username_unclaimed = username_unclaimed
self.information = information
self.disabled = information.get('disabled', False)
self.similar_search = information.get('similarSearch', False)
self.ignore_403 = information.get('ignore_403', False)
self.tags = information.get('tags', [])
self.type = information.get('type', 'username')
self.headers = information.get('headers', {})
self.errors = information.get('errors', {})
self.url_subpath = information.get('urlSubpath', '')
self.regex_check = information.get('regexCheck', None)
self.url_probe = information.get('urlProbe', None)
self.check_type = information.get('errorType', '')
self.request_head_only = information.get('request_head_only', '')
self.presense_strs = information.get('presenseStrs', [])
self.absence_strs = information.get('errorMsg', [])
self.request_future = None
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
return f"{self.name} ({self.url_home})"
return f"{self.name} ({self.url_main})"
class SitesInformation():
def __init__(self, data_file_path=None):
"""Create Sites Information Object.
class MaigretDatabase:
def __init__(self):
self._sites = []
self._engines = []
Contains information about all supported web sites.
@property
def sites(self: MaigretDatabase):
return self._sites
Keyword Arguments:
self -- This object.
data_file_path -- String which indicates path to data file.
The file name must end in ".json".
@property
def sites_dict(self):
return {site.name: site for site in self._sites}
There are 3 possible formats:
* Absolute File Format
For example, "c:/stuff/data.json".
* Relative File Format
The current working directory is used
as the context.
For example, "data.json".
* URL Format
For example,
"https://example.com/data.json", or
"http://example.com/data.json".
An exception will be thrown if the path
to the data file is not in the expected
format, or if there was any problem loading
the file.
@property
def engines(self: MaigretDatabase):
return self._engines
If this option is not specified, then a
default site list will be used.
Return Value:
Nothing.
"""
# Ensure that specified data file has correct extension.
if ".json" != data_file_path[-5:].lower():
raise FileNotFoundError(f"Incorrect JSON file extension for "
f"data file '{data_file_path}'."
)
if (("http://" == data_file_path[:7].lower()) or
("https://" == data_file_path[:8].lower())
):
# Reference is to a URL.
try:
response = requests.get(url=data_file_path)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{data_file_path}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
site_data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{data_file_path}'."
)
else:
# Reference is to a file.
try:
with open(data_file_path, "r", encoding="utf-8") as file:
try:
data = json.load(file)
site_data = data.get("sites")
engines_data = data.get("engines")
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{data_file_path}'."
)
self.sites = {}
def load_from_json(self: MaigretDatabase, json_data: dict) -> MaigretDatabase:
# Add all of site information from the json file to internal site list.
site_data = json_data.get("sites")
engines_data = json_data.get("engines")
for engine_name in engines_data:
self._engines.append(MaigretEngine(engine_name, engines_data[engine_name]))
for site_name in site_data:
try:
site = {}
@@ -178,8 +132,7 @@ class SitesInformation():
site.update(site_user_info)
self.sites[site_name] = \
SiteInformation(site_name,
maigret_site = MaigretSite(site_name,
site["urlMain"],
site["url"],
popularity_rank,
@@ -187,15 +140,74 @@ class SitesInformation():
site["username_unclaimed"],
site
)
self._sites.append(maigret_site)
except KeyError as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}' for site {site_name}: "
raise ValueError(f"Problem parsing json content for site {site_name}: "
f"Missing attribute {str(error)}."
)
return
return self
def site_name_list(self, popularity_rank=False):
def load_from_str(self: MaigretDatabase, db_str: str) -> MaigretDatabase:
try:
data = json.loads(db_str)
except Exception as error:
raise ValueError(f"Problem parsing json contents from str"
f"'{db_str[:50]}'...: {str(error)}."
)
return self.load_from_json(data)
def load_from_url(self: MaigretDatabase, url: str) -> MaigretDatabase:
is_url_valid = url.startswith('http://') or url.startswith('https://')
if not is_url_valid:
return False
try:
response = requests.get(url=url)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{url}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{url}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{url}'."
)
return self.load_from_json(data)
def load_from_file(self: MaigretDatabase, filename: str) -> MaigretDatabase:
try:
with open(filename, 'r', encoding='utf-8') as file:
try:
data = json.load(file)
except Exception as error:
raise ValueError(f"Problem parsing json contents from "
f"file '{filename}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{filename}'."
)
return self.load_from_json(data)
def site_name_list(self: MaigretDatabase, popularity_rank=False):
"""Get Site Name List.
Keyword Arguments:
@@ -223,27 +235,3 @@ class SitesInformation():
site_names = sorted([site.name for site in self], key=str.lower)
return site_names
def __iter__(self):
"""Iterator For Object.
Keyword Arguments:
self -- This object.
Return Value:
Iterator for sites object.
"""
for site_name in self.sites:
yield self.sites[site_name]
def __len__(self):
"""Length For Object.
Keyword Arguments:
self -- This object.
Return Value:
Length of sites object.
"""
return len(self.sites)
View File
+49
View File
@@ -0,0 +1,49 @@
"""Maigret Database test functions"""
from maigret.sites import MaigretDatabase
def test_load_empty_db_from_str():
db = MaigretDatabase()
db.load_from_str('{"engines": {}, "sites": {}}')
assert db.sites == []
assert db.engines == []
def test_load_valid_db():
db = MaigretDatabase()
db.load_from_json({
'engines': {
"XenForo": {
"presenseStrs": ["XenForo"],
"site": {
"errorMsg": [
"The specified member cannot be found. Please enter a member's entire name.",
],
"errorType": "message",
"errors": {
"You must be logged-in to do that.": "Login required"
},
"url": "{urlMain}{urlSubpath}/members/?username={username}"
}
},
},
'sites': {
"Amperka": {
"engine": "XenForo",
"rank": 121613,
"tags": [
"ru"
],
"urlMain": "http://forum.amperka.ru",
"username_claimed": "adam",
"username_unclaimed": "noonewouldeverusethis7"
},
}
})
assert len(db.sites) == 1
assert len(db.engines) == 1
assert db.sites[0].name == 'Amperka'
assert db.engines[0].name == 'XenForo'