From d2309396dbe2a28bc8cf4782641733ad4f41675c Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 11:41:24 +0200 Subject: [PATCH 01/28] Added mode parameter to read_table --- openwpm_utils/crawlhistory.py | 48 ++++++++++++++++++++++ openwpm_utils/s3.py | 75 +++++++++++++++++++++++++---------- 2 files changed, 101 insertions(+), 22 deletions(-) create mode 100644 openwpm_utils/crawlhistory.py diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py new file mode 100644 index 0000000..125b555 --- /dev/null +++ b/openwpm_utils/crawlhistory.py @@ -0,0 +1,48 @@ +import pyspark.sql.functions as F +from pyspark.sql.types import StringType + + +def reduce_to_worst_command_status(statuses): + """Takes a list of command_statuses and returns the worst of them + signature: List[str] -> str + """ + if "critical" in statuses: + return "critical" + if "error" in statuses: + return "error" + if "neterror" in statuses: + return "neterror" + if "timeout" in statuses: + return "timeout" + return "ok" + + +udf_reduce_to_worst_command_status = F.udf(reduce_to_worst_command_status, StringType()) + + +def reduce_to_best_command_status(statuses): + """Takes a list of command_statuses and returns the worst of them + signature: List[str] -> str + """ + if "ok" in statuses: + return "ok" + if "timeout" in statuses: + return "timeout" + if "neterror" in statuses: + return "neterror" + if "error" in statuses: + return "error" + return "critical" + + +udf_reduce_to_best_command_status = F.udf(reduce_to_worst_command_status, StringType()) + + +def get_worst_status_per_visit_id(crawl_history): + return crawl_history.groupBy("visit_id").agg( + udf_reduce_to_worst_command_status(F.collect_list("command_status")).alias( + "worst_status" + ) + ) + + diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index dac87ca..f453541 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -8,11 +8,14 @@ from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa from pyspark.sql import SQLContext +import pyspark.sql.functions as F + + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): + def __init__(self, spark_context, s3_directory, s3_bucket="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -29,10 +32,8 @@ def __init__(self, spark_context, s3_directory, self._s3_directory = s3_directory self._spark_context = spark_context self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) + self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (s3_bucket, s3_directory) + self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (s3_bucket, s3_directory) def read_table(self, table_name, columns=None): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. @@ -55,8 +56,7 @@ def read_content(self, content_hash): NOTE: This can only be run in the driver process since it requires access to the spark context """ - return self._spark_context.textFile( - self._s3_content_loc % content_hash) + return self._spark_context.textFile(self._s3_content_loc % content_hash) def collect_content(self, content_hash, beautify=False): """Collect content for `content_hash` to driver @@ -64,14 +64,14 @@ def collect_content(self, content_hash, beautify=False): NOTE: This can only be run in the driver process since it requires access to the spark context """ - content = ''.join(self.read_content(content_hash).collect()) + content = "".join(self.read_content(content_hash).collect()) if beautify: return jsbeautifier.beautify(content) return content class S3Dataset(object): - def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): + def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -89,8 +89,17 @@ def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): self._s3_table_loc = "%s/%s/visits/%%s" % (s3_bucket, s3_directory) self._content_key = "%s/content/%%s.gz" % s3_directory self._s3fs = s3fs.S3FileSystem() - - def read_table(self, table_name, columns=None): + self._incomplete_visit_ids = self.read_table( + "incomplete_visits", mode="all" + ).select("visit_id") + crawl_history = self.read_table("crawl_history", mode="all") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def read_table(self, table_name, columns=None, mode="successful"): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -99,31 +108,53 @@ def read_table(self, table_name, columns=None): OpenWPM table to read columns : list of strings The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table """ - return pq.ParquetDataset( - self._s3_table_loc % table_name, - filesystem=self._s3fs, - metadata_nthreads=4 - ).read(use_pandas_metadata=True, columns=columns).to_pandas() + df = ( + pq.ParquetDataset( + self._s3_table_loc % table_name, + filesystem=self._s3fs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) + if mode == "all": + return df + if mode == "failed": + return df.join(self._failed_visit_ids, "visit_id", how="inner").union( + df.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) + if mode == "successful": + return df.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") try: obj = s3.get_object( - Bucket=self._s3_bucket, - Key=self._content_key % content_hash + Bucket=self._s3_bucket, Key=self._content_key % content_hash ) body = obj["Body"] compressed_content = six.BytesIO(body.read()) body.close() except ClientError as e: - if e.response['Error']['Code'] != 'NoSuchKey': + if e.response["Error"]["Code"] != "NoSuchKey": raise else: return None - with gzip.GzipFile(fileobj=compressed_content, mode='r') as f: + with gzip.GzipFile(fileobj=compressed_content, mode="r") as f: content = f.read() if content is None or content == "": From 24df34c600f4032791af0b7c5be8f9da3714ab5a Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 12:22:30 +0200 Subject: [PATCH 02/28] Added mode to PySparkS3Dataset.read_table --- openwpm_utils/s3.py | 29 +++++++++++++++++++++++++++-- setup.py | 2 +- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index f453541..d78556f 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -34,8 +34,18 @@ def __init__(self, spark_context, s3_directory, s3_bucket="openwpm-crawls"): self._sql_context = SQLContext(spark_context) self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (s3_bucket, s3_directory) self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (s3_bucket, s3_directory) + self._incomplete_visit_ids = self.read_table( + "incomplete_visits", mode="all" + ).select("visit_id") + crawl_history = self.read_table("crawl_history", mode="all") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) - def read_table(self, table_name, columns=None): + + def read_table(self, table_name, columns=None, mode = "successful"): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -47,7 +57,22 @@ def read_table(self, table_name, columns=None): """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) if columns is not None: - return table.select(columns) + table = table.select(columns) + if mode == "all": + return table + if mode == "failed": + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) + if mode == "successful": + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) return table def read_content(self, content_hash): diff --git a/setup.py b/setup.py index 134558f..3056eae 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ name='openwpm-utils', license='MPL 2.0', url='https://github.com/mozilla/openwpm-utils', - version='0.2.0', + version='0.10.0', packages=['openwpm_utils'], # Dependencies From 348a819ff7277a59c62362b8cf54dd01fbbac91c Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 16:58:20 +0200 Subject: [PATCH 03/28] updated get_option_dict --- openwpm_utils/blocklist.py | 94 ++++++++++++++++++++++++++++++++++---- requirements.txt | 1 + 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 4b3e3b6..5c73b2a 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -1,10 +1,57 @@ -from six.moves.urllib.parse import urlparse +from urllib.parse import urlparse +import domain_utils as du -from .domain import get_ps_plus_1 +import abp_blocklist_parser +import requests +import pyspark.sql.functions as F +from pyspark.sql.types import * -def get_option_dict(url, top_level_url, content_type): - """Build an options dict for BlockListParser +# Download blocklists +raw_lists = { + 'nocoin': 'https://raw.githubusercontent.com/hoshsadiq/adblock-nocoin-list/master/nocoin.txt', + 'ublock-resource-abuse': 'https://raw.githubusercontent.com/uBlockOrigin/uAssets/master/filters/resource-abuse.txt' +} +# Initialize parsers +blockers = [ + abp_blocklist_parser.BlockListParser(requests.get(raw_lists["nocoin"]).content.decode()), + abp_blocklist_parser.BlockListParser(requests.get(raw_lists["ublock-resource-abuse"]).content.decode()) +] +# Mapping from +# https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType +# To +# https://help.eyeo.com/en/adblockplus/how-to-write-filters#options + +type_to_option = { +"beacon":"ping", +"csp_report":"other", +"font":"font", +"image":"image", +"imageset":"image", +"main_frame":"document", +"media":"media", +"object":"object", +"object_subrequest":"object", +"ping":"ping", +"script":"script", +"speculative":"other", +"stylesheet":"stylesheet", +"sub_frame":"subdocument", +"web_manifest":"other", +"websocket":"websocket", +"xbl":"other", +"xml_dtd":"other", +"xmlhttprequest":"xmlhttprequest", +"xslt":"other", +"other":"other" +} + +def get_option_dict(url, top_level_url, resource_type=None): + """Build an options dict for BlockListParser. + + These options are checked here: + * https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L104-L117 + * https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L240-L248 Parameters ---------- @@ -12,9 +59,8 @@ def get_option_dict(url, top_level_url, content_type): The URL of the requested resource. top_level_url : string The URL of the top-level frame of the requested resource - content_type : int - An integer representing the content type of the load. Mapping given in: - https://searchfox.org/mozilla-central/source/dom/base/nsIContentPolicy.idl + resource_type : string + All possible values are here https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType Returns ------- @@ -22,8 +68,36 @@ def get_option_dict(url, top_level_url, content_type): An "options" dictionary for use with BlockListParser """ options = {} - options["image"] = content_type == 3 - options["script"] = content_type == 2 + # Add type option. Value doesn't matter. + if resource_type: + try: + options[type_to_option[resource_type]] = True + except KeyError: + raise ValueError( + "Argument %s given for `resource_type` not found in map." % resource_type + ) options["domain"] = urlparse(top_level_url).hostname - options["third-party"] = get_ps_plus_1(url) != get_ps_plus_1(top_level_url) + + # Add third-party option if third party. Value doesn't matter. + if du.get_ps_plus_1(url) != du.get_ps_plus_1(top_level_url): + options["third-party"] = True return options + +def get_matching_rules(url, top_level_url, resource_type): + # skip top-level requests + if top_level_url is None: + return + + matching_rules = set() + options = get_option_dict(url, top_level_url, resource_type) + + for blocker in blockers: + result = blocker.should_block_with_items(url, options=options) + if result is not None and result[0] == 'blacklisted': + matching_rules = matching_rules.union(result[1]) + if len(matching_rules) > 0: + return tuple(matching_rules) + return + + +udf_get_matching_rules = F.udf(get_matching_rules, ArrayType(StringType())) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f8bc17c..fc5de55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ pyarrow pyspark s3fs six +abp-blocklist-parser From 84c13b76d53f18c54d4b65a94fc6c9eac5769662 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Thu, 25 Jun 2020 00:10:06 +0200 Subject: [PATCH 04/28] Verified mapping --- openwpm_utils/blocklist.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 5c73b2a..91dc730 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -21,6 +21,13 @@ # https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType # To # https://help.eyeo.com/en/adblockplus/how-to-write-filters#options +# With help from +# https://github.com/gorhill/uBlock/blob/1d5800629aaca0d152127d844ca7f6cf975f2f68/src/js/static-net-filtering.js#L55 +# https://github.com/gorhill/uBlock/blob/1d5800629aaca0d152127d844ca7f6cf975f2f68/src/js/static-net-filtering.js#L107 +# Import both objects and use +# let directMapping = {} +# Object.entries(typeNameToTypeValue).forEach(k => {directMapping[k[0]]= typeValueToTypeName[k[1] >>4]}) +# to generate an object that represents the direct mapping between these types type_to_option = { "beacon":"ping", From 0e275a9b4ea2601b470b09d4e6001048d8f1a5d2 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Thu, 25 Jun 2020 01:04:39 +0200 Subject: [PATCH 05/28] Getting blockers to work without files --- openwpm_utils/blocklist.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 91dc730..45b440b 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -14,8 +14,8 @@ } # Initialize parsers blockers = [ - abp_blocklist_parser.BlockListParser(requests.get(raw_lists["nocoin"]).content.decode()), - abp_blocklist_parser.BlockListParser(requests.get(raw_lists["ublock-resource-abuse"]).content.decode()) + abp_blocklist_parser.BlockListParser(regexes=requests.get(raw_lists["nocoin"]).content.decode()), + abp_blocklist_parser.BlockListParser(regexes=requests.get(raw_lists["ublock-resource-abuse"]).content.decode()) ] # Mapping from # https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType From 4c182a93fb4d4a70939d18aa5fbf9dc73ea6ec22 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 6 Jul 2020 12:38:53 +0200 Subject: [PATCH 06/28] Wrapped get_matching rules to bind blockers in scope --- openwpm_utils/blocklist.py | 95 ++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 51 deletions(-) diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 45b440b..bf27f7e 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -1,25 +1,16 @@ from urllib.parse import urlparse import domain_utils as du +from typing import List -import abp_blocklist_parser import requests import pyspark.sql.functions as F from pyspark.sql.types import * +from abp_blocklist_parser import BlockListParser -# Download blocklists -raw_lists = { - 'nocoin': 'https://raw.githubusercontent.com/hoshsadiq/adblock-nocoin-list/master/nocoin.txt', - 'ublock-resource-abuse': 'https://raw.githubusercontent.com/uBlockOrigin/uAssets/master/filters/resource-abuse.txt' -} -# Initialize parsers -blockers = [ - abp_blocklist_parser.BlockListParser(regexes=requests.get(raw_lists["nocoin"]).content.decode()), - abp_blocklist_parser.BlockListParser(regexes=requests.get(raw_lists["ublock-resource-abuse"]).content.decode()) -] # Mapping from # https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType -# To +# To # https://help.eyeo.com/en/adblockplus/how-to-write-filters#options # With help from # https://github.com/gorhill/uBlock/blob/1d5800629aaca0d152127d844ca7f6cf975f2f68/src/js/static-net-filtering.js#L55 @@ -30,29 +21,30 @@ # to generate an object that represents the direct mapping between these types type_to_option = { -"beacon":"ping", -"csp_report":"other", -"font":"font", -"image":"image", -"imageset":"image", -"main_frame":"document", -"media":"media", -"object":"object", -"object_subrequest":"object", -"ping":"ping", -"script":"script", -"speculative":"other", -"stylesheet":"stylesheet", -"sub_frame":"subdocument", -"web_manifest":"other", -"websocket":"websocket", -"xbl":"other", -"xml_dtd":"other", -"xmlhttprequest":"xmlhttprequest", -"xslt":"other", -"other":"other" + "beacon": "ping", + "csp_report": "other", + "font": "font", + "image": "image", + "imageset": "image", + "main_frame": "document", + "media": "media", + "object": "object", + "object_subrequest": "object", + "ping": "ping", + "script": "script", + "speculative": "other", + "stylesheet": "stylesheet", + "sub_frame": "subdocument", + "web_manifest": "other", + "websocket": "websocket", + "xbl": "other", + "xml_dtd": "other", + "xmlhttprequest": "xmlhttprequest", + "xslt": "other", + "other": "other", } + def get_option_dict(url, top_level_url, resource_type=None): """Build an options dict for BlockListParser. @@ -81,30 +73,31 @@ def get_option_dict(url, top_level_url, resource_type=None): options[type_to_option[resource_type]] = True except KeyError: raise ValueError( - "Argument %s given for `resource_type` not found in map." % resource_type + "Argument %s given for `resource_type` not found in map." + % resource_type ) options["domain"] = urlparse(top_level_url).hostname - + # Add third-party option if third party. Value doesn't matter. if du.get_ps_plus_1(url) != du.get_ps_plus_1(top_level_url): options["third-party"] = True return options -def get_matching_rules(url, top_level_url, resource_type): - # skip top-level requests - if top_level_url is None: - return - - matching_rules = set() - options = get_option_dict(url, top_level_url, resource_type) - - for blocker in blockers: - result = blocker.should_block_with_items(url, options=options) - if result is not None and result[0] == 'blacklisted': - matching_rules = matching_rules.union(result[1]) - if len(matching_rules) > 0: - return tuple(matching_rules) - return +def prepare_get_matching_rules(blockers: List[BlockListParser]): + def get_matching_rules(url, top_level_url, resource_type): + # skip top-level requests + if top_level_url is None: + return + + matching_rules = set() + options = get_option_dict(url, top_level_url, resource_type) + + for blocker in blockers: + result = blocker.should_block_with_items(url, options=options) + if result is not None and result[0] == "blacklisted": + matching_rules = matching_rules.union(result[1]) + if len(matching_rules) > 0: + return tuple(matching_rules) -udf_get_matching_rules = F.udf(get_matching_rules, ArrayType(StringType())) \ No newline at end of file + return F.udf(get_matching_rules, ArrayType(StringType())) From fabb2f7109ecc296f3dad8de423664ed39e23650 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 6 Jul 2020 13:01:17 +0200 Subject: [PATCH 07/28] Removed filtering from S3Datasets --- openwpm_utils/s3.py | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index d78556f..65b9293 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -114,17 +114,8 @@ def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): self._s3_table_loc = "%s/%s/visits/%%s" % (s3_bucket, s3_directory) self._content_key = "%s/content/%%s.gz" % s3_directory self._s3fs = s3fs.S3FileSystem() - self._incomplete_visit_ids = self.read_table( - "incomplete_visits", mode="all" - ).select("visit_id") - crawl_history = self.read_table("crawl_history", mode="all") - self._failed_visit_ids = ( - get_worst_status_per_visit_id(crawl_history) - .where(F.col("worst_status") != "ok") - .select("visit_id") - ) - def read_table(self, table_name, columns=None, mode="successful"): + def read_table(self, table_name, columns=None): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -138,7 +129,7 @@ def read_table(self, table_name, columns=None, mode="successful"): Success is determined per visit_id. A visit_id is failed if one of it's commands failed or if it's in the interrupted table """ - df = ( + return ( pq.ParquetDataset( self._s3_table_loc % table_name, filesystem=self._s3fs, @@ -147,21 +138,6 @@ def read_table(self, table_name, columns=None, mode="successful"): .read(use_pandas_metadata=True, columns=columns) .to_pandas() ) - if mode == "all": - return df - if mode == "failed": - return df.join(self._failed_visit_ids, "visit_id", how="inner").union( - df.join(self._incomplete_visit_ids, "visit_id", how="inner") - ) - if mode == "successful": - return df.join(self._failed_visit_ids, "visit_id", how="leftanti").join( - self._incomplete_visit_ids, "visit_id", how="leftanti" - ) - else: - raise AssertionError( - f"Mode was ${mode}," - "allowed modes are 'all', 'failed' and 'successful'" - ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" From f63989e75ca55871768c8de3213aef8f08aa7d69 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 6 Jul 2020 16:47:47 +0200 Subject: [PATCH 08/28] Removed udfs --- openwpm_utils/crawlhistory.py | 59 +++++++++++------------------------ openwpm_utils/s3.py | 11 ++++--- 2 files changed, 25 insertions(+), 45 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 125b555..10e025a 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -1,48 +1,27 @@ import pyspark.sql.functions as F from pyspark.sql.types import StringType - -def reduce_to_worst_command_status(statuses): - """Takes a list of command_statuses and returns the worst of them - signature: List[str] -> str - """ - if "critical" in statuses: - return "critical" - if "error" in statuses: - return "error" - if "neterror" in statuses: - return "neterror" - if "timeout" in statuses: - return "timeout" - return "ok" - - -udf_reduce_to_worst_command_status = F.udf(reduce_to_worst_command_status, StringType()) - - -def reduce_to_best_command_status(statuses): - """Takes a list of command_statuses and returns the worst of them - signature: List[str] -> str - """ - if "ok" in statuses: - return "ok" - if "timeout" in statuses: - return "timeout" - if "neterror" in statuses: - return "neterror" - if "error" in statuses: - return "error" - return "critical" - - -udf_reduce_to_best_command_status = F.udf(reduce_to_worst_command_status, StringType()) +reduce_to_worst_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "error"), "error") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "timeout"), "timeout") + .otherwise("ok") + .alias("worst_status") +) + + +reduce_to_best_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "ok"), "ok") + .when(F.array_contains("command_status", "timeout"), "timeout") + .when(F.array_contains("command_status", "neterror"), "neterror") + .otherwise("critical") + .alias("worst_status") +) def get_worst_status_per_visit_id(crawl_history): return crawl_history.groupBy("visit_id").agg( - udf_reduce_to_worst_command_status(F.collect_list("command_status")).alias( - "worst_status" - ) + reduce_to_worst_command_status(F.collect_list("command_status")) ) - - diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 65b9293..0e7e87a 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -1,21 +1,21 @@ import gzip +from typing import List import boto3 import jsbeautifier import pyarrow.parquet as pq +import pyspark.sql.functions as F import s3fs import six from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa from pyspark.sql import SQLContext -import pyspark.sql.functions as F - from openwpm_utils.crawlhistory import get_worst_status_per_visit_id class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory, s3_bucket="openwpm-crawls"): + def __init__(self, spark_context, s3_directory: str, s3_bucket:str="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -44,8 +44,9 @@ def __init__(self, spark_context, s3_directory, s3_bucket="openwpm-crawls"): .select("visit_id") ) - - def read_table(self, table_name, columns=None, mode = "successful"): + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters From 505bcf6931c649e1dede6e9bebe57d3f175919c5 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 6 Jul 2020 16:52:33 +0200 Subject: [PATCH 09/28] Moved comment to the appropriate place --- openwpm_utils/s3.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 0e7e87a..4f69732 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -55,6 +55,10 @@ def read_table( OpenWPM table to read columns : list of strings The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) if columns is not None: @@ -125,10 +129,6 @@ def read_table(self, table_name, columns=None): OpenWPM table to read columns : list of strings The set of columns to filter the parquet dataset by - mode : string - The valid values are "successful", "failed", "all" - Success is determined per visit_id. A visit_id is failed - if one of it's commands failed or if it's in the interrupted table """ return ( pq.ParquetDataset( From 14e279d223c77be47fb5c46a97468f52624d8d52 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 13 Jul 2020 16:19:30 +0200 Subject: [PATCH 10/28] We shouldn't try to follow OpenWPMs versions --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3056eae..992a45e 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ name='openwpm-utils', license='MPL 2.0', url='https://github.com/mozilla/openwpm-utils', - version='0.10.0', + version='0.3.0', packages=['openwpm_utils'], # Dependencies From d07eed707bf0a094fdf1cb7b327dcc6f7e73685a Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 20 Jul 2020 16:16:18 +0200 Subject: [PATCH 11/28] Backported fixes in get_worst_status_per_visit_id --- openwpm_utils/crawlhistory.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 10e025a..9dfaf1e 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -22,6 +22,7 @@ def get_worst_status_per_visit_id(crawl_history): - return crawl_history.groupBy("visit_id").agg( - reduce_to_worst_command_status(F.collect_list("command_status")) - ) + """Adds column `worst_status`""" + return (crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status",reduce_to_worst_command_status)) From a0be666e9620a8ca4fc51849df4d3ad7b87e0f29 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 20 Jul 2020 16:33:44 +0200 Subject: [PATCH 12/28] Fixed reduce_to_best_command_status --- openwpm_utils/crawlhistory.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 9dfaf1e..3799f47 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -12,12 +12,12 @@ reduce_to_best_command_status = ( - F.when(F.array_contains("command_status", "critical"), "critical") - .when(F.array_contains("command_status", "ok"), "ok") + F.when(F.array_contains("command_status", "ok"), "ok") .when(F.array_contains("command_status", "timeout"), "timeout") .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "error"), "error") .otherwise("critical") - .alias("worst_status") + .alias("best_status") ) From be6aa2668650259203ed7f02fee38c0646fc936f Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 11:57:44 +0200 Subject: [PATCH 13/28] Added display_crawl_results --- openwpm_utils/crawlhistory.py | 85 +++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 3799f47..34eca5b 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -1,5 +1,7 @@ import pyspark.sql.functions as F from pyspark.sql.types import StringType +import json + reduce_to_worst_command_status = ( F.when(F.array_contains("command_status", "critical"), "critical") @@ -26,3 +28,86 @@ def get_worst_status_per_visit_id(crawl_history): return (crawl_history.groupBy("visit_id") .agg(F.collect_list("command_status").alias("command_status")) .withColumn("worst_status",reduce_to_worst_command_status)) + + +# TODO: This needs a name that expresses that we are giving general stats about the +# way the crawl ran and not it's specific data +def display_crawl_results(crawl_history, interrupted_visits): + """ + Analyze crawl_history and interrupted_visits to display general + success statistics + This function should be given the all entries in the crawl_history and + interrupted_visits tableq + """ + crawl_history.groupBy("command").count().show() + + total_num_command_sequences = crawl_history.groupBy("visit_id").count() + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + print( + "Percentage of command_sequence that didn't complete successfully %0.2f%%" + % ( + visit_id_and_worst_status.where(F.col("worst_status") != "ok").count() + / float(total_num_command_sequences) + * 100 + ) + ) + net_error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors(%0.2f%% of the all command_sequences)" + % (net_error_count, net_error_count / float(total_num_command_sequences) * 100) + ) + timeout_count = visit_id_and_worst_status.where( + F.col("worst_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts(%0.2f%% of the all command_sequences)" + % (timeout_count, timeout_count / float(total_num_command_sequences) * 100) + ) + + error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "error" + ).count() + print( + "There were a total of %d errors(%0.2f%% of the all command_sequences)" + % (error_count, error_count / float(total_num_command_sequences) * 100) + ) + + print( + f"A total of ${interrupted_visits.count()} were interrupted." + f"This represents ${interrupted_visits.count()/ float(total_num_command_sequences)* 100} % of the entire crawl" + ) + + def extract_website_from_arguments(arguments): + """Given the arguments of a get_command this function returns which website was visited""" + return json.loads(arguments)["url"] + + udf_extract_website_from_arguments = F.udf( + extract_website_from_arguments, StringType() + ) + + visit_id_to_website = crawl_history.where( + F.col("command") == "GetCommand" + ).withColumn("website", udf_extract_website_from_arguments("arguments")) + visit_id_to_website = visit_id_to_website[["visit_id", "website"]] + + visit_id_website_status = visit_id_and_worst_status.join( + visit_id_to_website, "visit_id" + ) + multiple_successes = ( + visit_id_website_status.where(F.col("worst_status") == "ok") + .join(interrupted_visits, "visit_id", how="leftanti") + .groupBy("website") + .count() + .filter("count > 1") + .orderBy(F.desc("count")) + ) + + print( + f"There were {multiple_successes.count()} websites that were successfully visited multiple times" + ) + multiple_successes.groupBy( + F.col("count").alias("Number of successes") + ).count().show() + multiple_successes.filter("count > 2").show() From 5b24f6fe2d694832a263f7bfbe49a500201123eb Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 15:02:54 +0200 Subject: [PATCH 14/28] Rewrote crawlhistory.py --- openwpm_utils/crawlhistory.py | 59 ++++++++++++++++++++++++++++++----- openwpm_utils/dataquality.py | 54 ++++++++++++++++++++++++++++++++ setup.cfg | 3 ++ 3 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 openwpm_utils/dataquality.py diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 34eca5b..f445a4d 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -1,7 +1,7 @@ -import pyspark.sql.functions as F -from pyspark.sql.types import StringType import json +import pyspark.sql.functions as F +from pyspark.sql.types import StringType reduce_to_worst_command_status = ( F.when(F.array_contains("command_status", "critical"), "critical") @@ -30,9 +30,7 @@ def get_worst_status_per_visit_id(crawl_history): .withColumn("worst_status",reduce_to_worst_command_status)) -# TODO: This needs a name that expresses that we are giving general stats about the -# way the crawl ran and not it's specific data -def display_crawl_results(crawl_history, interrupted_visits): +def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits): """ Analyze crawl_history and interrupted_visits to display general success statistics @@ -41,7 +39,8 @@ def display_crawl_results(crawl_history, interrupted_visits): """ crawl_history.groupBy("command").count().show() - total_num_command_sequences = crawl_history.groupBy("visit_id").count() + # Analyzing status per command_sequence + total_num_command_sequences = crawl_history.groupBy("visit_id").count().count() visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) print( "Percentage of command_sequence that didn't complete successfully %0.2f%%" @@ -75,10 +74,15 @@ def display_crawl_results(crawl_history, interrupted_visits): ) print( - f"A total of ${interrupted_visits.count()} were interrupted." - f"This represents ${interrupted_visits.count()/ float(total_num_command_sequences)* 100} % of the entire crawl" + f"A total of {interrupted_visits.count()} command_sequences were interrupted." + f"This represents {interrupted_visits.count()/ float(total_num_command_sequences)* 100:.2f} % of the entire crawl" ) + +def display_crawl_history_per_website(crawl_history, interrupted_visits): + # Analyzing status per website + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + def extract_website_from_arguments(arguments): """Given the arguments of a get_command this function returns which website was visited""" return json.loads(arguments)["url"] @@ -90,11 +94,50 @@ def extract_website_from_arguments(arguments): visit_id_to_website = crawl_history.where( F.col("command") == "GetCommand" ).withColumn("website", udf_extract_website_from_arguments("arguments")) + visit_id_to_website = visit_id_to_website[["visit_id", "website"]] visit_id_website_status = visit_id_and_worst_status.join( visit_id_to_website, "visit_id" ) + best_status_per_website = visit_id_website_status.groupBy("website").agg( + udf_reduce_to_best_command_status(F.collect_list("worst_status")).alias( + "best_status" + ) + ) + total_number_websites = best_status_per_website.count() + print(f"There was an attempt to visit a total of {total_number_websites} websites") + + print( + "Percentage of websites that didn't complete successfully %0.2f%%" + % ( + best_status_per_website.where(F.col("best_status") != "ok").count() + / float(total_number_websites) + * 100 + ) + ) + net_error_count = best_status_per_website.where( + F.col("best_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors (%0.2f%% of the all websites)" + % (net_error_count, net_error_count / float(total_number_websites) * 100) + ) + timeout_count = best_status_per_website.where( + F.col("best_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts (%0.2f%% of the all websites)" + % (timeout_count, timeout_count / float(total_number_websites) * 100) + ) + + error_count = best_status_per_website.where(F.col("best_status") == "error").count() + + print( + "There were a total of %d errors (%0.2f%% of the websites)" + % (error_count, error_count / float(total_number_websites) * 100) + ) + multiple_successes = ( visit_id_website_status.where(F.col("worst_status") == "ok") .join(interrupted_visits, "visit_id", how="leftanti") diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py new file mode 100644 index 0000000..d414a01 --- /dev/null +++ b/openwpm_utils/dataquality.py @@ -0,0 +1,54 @@ +from pyspark.sql.functions import col, countDistinct, isnan, lit, sum + + +def count_not_null(c, nan_as_null=False): + """Use conversion between boolean and integer + - False -> 0 + - True -> 1 + TODO: add `blank_as_null` + """ + pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) + return sum(pred.cast("integer")).alias(c) + + +def count_null(c, nan_as_null=False): + """Use conversion between boolean and integer + - False -> 0 + - True -> 1 + TODO: add `blank_as_null` + """ + pred = col(c).isNull() | (isnan(c) if nan_as_null else lit(False)) + return sum(pred.cast("integer")).alias(c) + + +def print_distinct_counts(df, col_name): + print( + "Number of distinct %s %d" + % (col_name, df.agg(countDistinct(col(col_name))).collect()[0][0]) + ) + + +def print_total_counts(df, col_name): + print("Total number of %s %d" % (col_name, df.select(col(col_name)).count())) + + +def check_df(df, skip_null_check=True): + """A set of generic checks to run on each table""" + print("Total number of records: %d" % df.count()) + + for item in ["visit_id", "instance_id"]: + print_distinct_counts(df, item) + + # Count of nulls + if not skip_null_check: + print("\nColumns with > 0 number of nulls / NaN values:") + for c in df.columns: + count = df.agg(count_null(c)).collect()[0][0] + if count > 0: + print("* %-20s | %10d" % (c, count)) + + # Count of bad visit ids (default when not available in extension) + print( + "\nNumber of records with visit_id == -1: %d" + % df.where(df.visit_id == -1).count() + ) diff --git a/setup.cfg b/setup.cfg index d93c4f5..3162fbc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,6 @@ test=pytest [tool:pytest] addopts = --flake8 -rw testpaths = tests + +[flake8] +max-line-length = 88 \ No newline at end of file From 937285b03b2ed92a06b76c603b893dee352c4c07 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 15:04:44 +0200 Subject: [PATCH 15/28] Removed dataquality.py --- openwpm_utils/dataquality.py | 54 ------------------------------------ 1 file changed, 54 deletions(-) delete mode 100644 openwpm_utils/dataquality.py diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py deleted file mode 100644 index d414a01..0000000 --- a/openwpm_utils/dataquality.py +++ /dev/null @@ -1,54 +0,0 @@ -from pyspark.sql.functions import col, countDistinct, isnan, lit, sum - - -def count_not_null(c, nan_as_null=False): - """Use conversion between boolean and integer - - False -> 0 - - True -> 1 - TODO: add `blank_as_null` - """ - pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) - return sum(pred.cast("integer")).alias(c) - - -def count_null(c, nan_as_null=False): - """Use conversion between boolean and integer - - False -> 0 - - True -> 1 - TODO: add `blank_as_null` - """ - pred = col(c).isNull() | (isnan(c) if nan_as_null else lit(False)) - return sum(pred.cast("integer")).alias(c) - - -def print_distinct_counts(df, col_name): - print( - "Number of distinct %s %d" - % (col_name, df.agg(countDistinct(col(col_name))).collect()[0][0]) - ) - - -def print_total_counts(df, col_name): - print("Total number of %s %d" % (col_name, df.select(col(col_name)).count())) - - -def check_df(df, skip_null_check=True): - """A set of generic checks to run on each table""" - print("Total number of records: %d" % df.count()) - - for item in ["visit_id", "instance_id"]: - print_distinct_counts(df, item) - - # Count of nulls - if not skip_null_check: - print("\nColumns with > 0 number of nulls / NaN values:") - for c in df.columns: - count = df.agg(count_null(c)).collect()[0][0] - if count > 0: - print("* %-20s | %10d" % (c, count)) - - # Count of bad visit ids (default when not available in extension) - print( - "\nNumber of records with visit_id == -1: %d" - % df.where(df.visit_id == -1).count() - ) From d8eb4bbb378cd9e1ca5babd3a0a13c71f45b1c50 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 13 Jul 2020 16:36:41 +0200 Subject: [PATCH 16/28] Used typeannotations --- openwpm_utils/crawlhistory.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index f445a4d..68bae8d 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -35,7 +35,7 @@ def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits Analyze crawl_history and interrupted_visits to display general success statistics This function should be given the all entries in the crawl_history and - interrupted_visits tableq + interrupted_visits table """ crawl_history.groupBy("command").count().show() @@ -100,10 +100,10 @@ def extract_website_from_arguments(arguments): visit_id_website_status = visit_id_and_worst_status.join( visit_id_to_website, "visit_id" ) - best_status_per_website = visit_id_website_status.groupBy("website").agg( - udf_reduce_to_best_command_status(F.collect_list("worst_status")).alias( - "best_status" - ) + best_status_per_website = ( + visit_id_website_status.groupBy("website") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("best_status",reduce_to_best_command_status) ) total_number_websites = best_status_per_website.count() print(f"There was an attempt to visit a total of {total_number_websites} websites") From 5280711c330217ede0d595826e6f347aeb6c47dc Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 27 Jul 2020 15:38:42 +0200 Subject: [PATCH 17/28] Fixing display_crawl_history --- openwpm_utils/crawlhistory.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 68bae8d..3538b3d 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -102,10 +102,12 @@ def extract_website_from_arguments(arguments): ) best_status_per_website = ( visit_id_website_status.groupBy("website") - .agg(F.collect_list("command_status").alias("command_status")) + .agg(F.collect_list("worst_status").alias("command_status")) .withColumn("best_status",reduce_to_best_command_status) ) + total_number_websites = best_status_per_website.count() + print(f"There was an attempt to visit a total of {total_number_websites} websites") print( @@ -153,4 +155,6 @@ def extract_website_from_arguments(arguments): multiple_successes.groupBy( F.col("count").alias("Number of successes") ).count().show() + + print("A list of all websites that where successfully visited more than twice:") multiple_successes.filter("count > 2").show() From ae05d1ff1b1649347cd9713fd4627d9948849271 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 27 Jul 2020 16:29:06 +0200 Subject: [PATCH 18/28] Added docstrings --- openwpm_utils/crawlhistory.py | 43 ++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 3538b3d..1f8eca1 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -31,11 +31,24 @@ def get_worst_status_per_visit_id(crawl_history): def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits): - """ - Analyze crawl_history and interrupted_visits to display general - success statistics - This function should be given the all entries in the crawl_history and - interrupted_visits table + """ Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by command_sequence + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_command_sequence(crawl_history, incomplete) + """ crawl_history.groupBy("command").count().show() @@ -80,7 +93,25 @@ def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits def display_crawl_history_per_website(crawl_history, interrupted_visits): - # Analyzing status per website + """ Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by website + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_website(crawl_history, incomplete) + + """ visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) def extract_website_from_arguments(arguments): From c89dd3ab642bc3eae3efcba0c384dd2b95159811 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 27 Jul 2020 17:08:06 +0200 Subject: [PATCH 19/28] Made PySparkS3Dataset forward to S3Dataset It turns out that opening a file via boto is a lot faster than spark_content.textFile Closes #23 --- openwpm_utils/s3.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 4f69732..56af90a 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -80,24 +80,10 @@ def read_table( ) return table - def read_content(self, content_hash): - """Read the content corresponding to `content_hash`. - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - return self._spark_context.textFile(self._s3_content_loc % content_hash) - def collect_content(self, content_hash, beautify=False): - """Collect content for `content_hash` to driver - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - content = "".join(self.read_content(content_hash).collect()) - if beautify: - return jsbeautifier.beautify(content) - return content + """Collect content by directly connecting to S3 via boto3""" + non_spark_dataset = S3Dataset(self._s3_directory, self._s3_bucket) + return non_spark_dataset.collect_content(content_hash=content_hash, beautify=beautify) class S3Dataset(object): From 798ea1e4851db66855ef8970db5dbc018af99105 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 27 Jul 2020 19:17:14 +0200 Subject: [PATCH 20/28] Made PySparkS3Dataset a subclass of S3Dataset --- openwpm_utils/s3.py | 136 +++++++++++++++++++++----------------------- 1 file changed, 64 insertions(+), 72 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 56af90a..a649345 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -14,78 +14,6 @@ from openwpm_utils.crawlhistory import get_worst_status_per_visit_id -class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory: str, s3_bucket:str="openwpm-crawls"): - """Helper class to load OpenWPM datasets from S3 using PySpark - - Parameters - ---------- - spark_context - Spark context. In databricks, this is available via the `sc` - variable. - s3_directory : string - Directory within the S3 bucket in which the dataset is saved. - s3_bucket : string, optional - The bucket name on S3. Defaults to `openwpm-crawls`. - """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory - self._spark_context = spark_context - self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (s3_bucket, s3_directory) - self._incomplete_visit_ids = self.read_table( - "incomplete_visits", mode="all" - ).select("visit_id") - crawl_history = self.read_table("crawl_history", mode="all") - self._failed_visit_ids = ( - get_worst_status_per_visit_id(crawl_history) - .where(F.col("worst_status") != "ok") - .select("visit_id") - ) - - def read_table( - self, table_name: str, columns: List[str] = None, mode: str = "successful" - ): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. - - Parameters - ---------- - table_name : string - OpenWPM table to read - columns : list of strings - The set of columns to filter the parquet dataset by - mode : string - The valid values are "successful", "failed", "all" - Success is determined per visit_id. A visit_id is failed - if one of it's commands failed or if it's in the interrupted table - """ - table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if columns is not None: - table = table.select(columns) - if mode == "all": - return table - if mode == "failed": - return table.join(self._failed_visit_ids, "visit_id", how="inner").union( - table.join(self._incomplete_visit_ids, "visit_id", how="inner") - ) - if mode == "successful": - return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( - self._incomplete_visit_ids, "visit_id", how="leftanti" - ) - else: - raise AssertionError( - f"Mode was ${mode}," - "allowed modes are 'all', 'failed' and 'successful'" - ) - return table - - def collect_content(self, content_hash, beautify=False): - """Collect content by directly connecting to S3 via boto3""" - non_spark_dataset = S3Dataset(self._s3_directory, self._s3_bucket) - return non_spark_dataset.collect_content(content_hash=content_hash, beautify=beautify) - - class S3Dataset(object): def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas @@ -154,3 +82,67 @@ def collect_content(self, content_hash, beautify=False): except IndexError: pass return content + +class PySparkS3Dataset(S3Dataset): + def __init__(self, spark_context, s3_directory: str, s3_bucket:str="openwpm-crawls"): + """Helper class to load OpenWPM datasets from S3 using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + s3_directory : string + Directory within the S3 bucket in which the dataset is saved. + s3_bucket : string, optional + The bucket name on S3. Defaults to `openwpm-crawls`. + """ + super.__init__(s3_directory, s3_bucket) + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._s3_table_loc = f"s3a://{self._s3_table_loc}" + self._incomplete_visit_ids = self.read_table( + "incomplete_visits", mode="all" + ).select("visit_id") + crawl_history = self.read_table("crawl_history", mode="all") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + if columns is not None: + table = table.select(columns) + if mode == "all": + return table + if mode == "failed": + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) + if mode == "successful": + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + return table From f92da0cf06aeb47a0c59f260c300306e6e5c1ceb Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 19 Mar 2021 12:02:55 +0100 Subject: [PATCH 21/28] Returning None if something goes wrong during get_matching_rules --- openwpm_utils/blocklist.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index bf27f7e..3324342 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -77,6 +77,9 @@ def get_option_dict(url, top_level_url, resource_type=None): % resource_type ) options["domain"] = urlparse(top_level_url).hostname + if options["domain"] == None: + # If somehow the top_level_url should be unparseable + return None # Add third-party option if third party. Value doesn't matter. if du.get_ps_plus_1(url) != du.get_ps_plus_1(top_level_url): @@ -92,12 +95,16 @@ def get_matching_rules(url, top_level_url, resource_type): matching_rules = set() options = get_option_dict(url, top_level_url, resource_type) + if options is None: + print(f"Something went wrong when handling {url} on top level URL {top_level_url}") + return for blocker in blockers: result = blocker.should_block_with_items(url, options=options) if result is not None and result[0] == "blacklisted": matching_rules = matching_rules.union(result[1]) - if len(matching_rules) > 0: - return tuple(matching_rules) + + if len(matching_rules) > 0: + return tuple(matching_rules) return F.udf(get_matching_rules, ArrayType(StringType())) From 8cf5823d621c303fc25c3e2376f2239790854912 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 19 Mar 2021 12:05:28 +0100 Subject: [PATCH 22/28] Added demo file --- tests/test_crawlhistory.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 tests/test_crawlhistory.py diff --git a/tests/test_crawlhistory.py b/tests/test_crawlhistory.py new file mode 100644 index 0000000..bd10f58 --- /dev/null +++ b/tests/test_crawlhistory.py @@ -0,0 +1,26 @@ +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id, reduce_to_worst_command_status + +from collections import namedtuple +import pyspark as spark + +srow = namedtuple('simple_row', 'visit_id command_status'.split()) +data = [ + srow('1', "critical"), + srow('1', "ok"), + srow('2', "ok"), + srow('3', "neterror"), + srow('3', "timeout") +] +data2 = [ + srow('1', ["ok", "critical"]), + srow('2', ["ok"]), + srow('3', ["timeout", "neterror"]), +] + +test_df = spark.createDataFrame(data) +test_df.printSchema() +test_df2 = spark.createDataFrame(data2) +test_df2.printSchema() + + +display(get_worst_status_per_visit_id(test_df)) \ No newline at end of file From 5b5157da91ae0d83d0e717ced68acaa6ae08ff46 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 19 Mar 2021 16:21:06 +0100 Subject: [PATCH 23/28] GcsDataset implementation --- openwpm_utils/gcs.py | 136 +++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 6 +- 2 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 openwpm_utils/gcs.py diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py new file mode 100644 index 0000000..0d7f6d5 --- /dev/null +++ b/openwpm_utils/gcs.py @@ -0,0 +1,136 @@ +from typing import List, Optional, Union + +import gcsfs +from google.api_core.exceptions import NotFound +import jsbeautifier +import pyarrow.parquet as pq +import pyspark.sql.functions as F +from google.cloud import storage +from pyspark.context import SparkContext +from pyspark.sql import SQLContext + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id + + +class GCSDataset(object): + def __init__(self, base_dir: str, bucket:Optional[str]="openwpm-data", **kwargs) -> None: + """Helper class to load OpenWPM datasets from GCS using pandas + + This dataset wrapper is safe to use by spark worker processes, as it + does not require the spark context. + + Parameters + ---------- + base_dir + Directory within the GCS bucket in which the dataset is saved. + bucket + The bucket name on GCS. + **kwargs + Passed on to GCSFS so you can customize it to your needs + """ + self._kwargs = kwargs + self._bucket = bucket + self._base_dir = base_dir + self._table_location_format_string = f"{bucket}/{base_dir}/visits/%s" + self._content_key = f"{base_dir}/content/%s.gz" + self._gcsfs = gcsfs.GCSFileSystem(**kwargs) + + def read_table(self, table_name, columns=None): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + """ + return ( + pq.ParquetDataset( + self._table_location_format_string % table_name, + filesystem= self._gcsfs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) + + def collect_content(self, content_hash: str, beautify: bool=False) -> Optional[Union[bytes, str]]: + """Collect content by directly connecting to GCS via google.cloud.storage""" + storage_client = storage.Client() + bucket = storage_client.bucket(self._bucket) + + + blob = bucket.blob(self._content_key % content_hash) + content: Union[bytes, str] = blob.download_as_bytes() + + if beautify: + try: + content = jsbeautifier.beautify(content) + except IndexError: + pass + return content + +class PySparkGCSDataset(GCSDataset): + def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openwpm-data", **kwargs) -> None: + """Helper class to load OpenWPM datasets from GCS using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + base_dir : string + Directory within the bucket in which the dataset is saved. + bucket : string, optional + The bucket name on GCS. Defaults to `openwpm-data`. + """ + super().__init__(base_dir, bucket, **kwargs) + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._table_location_format_string = f"gcs://{self._table_location_format_string}" + self._incomplete_visit_ids = self.read_table( + "incomplete_visits", mode="all" + ).select("visit_id") + crawl_history = self.read_table("crawl_history", mode="all") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._sql_context.read.parquet(self._table_location_format_string % table_name) + if columns is not None: + table = table.select(columns) + if mode == "all": + return table + if mode == "failed": + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) + if mode == "successful": + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + return table diff --git a/requirements.txt b/requirements.txt index 432baf6..b6fcdb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,11 @@ +abp-blocklist-parser boto3 domain_utils +gcsfs +google-cloud-storage jsbeautifier pandas plyvel pyarrow pyspark -s3fs -abp-blocklist-parser +s3fs \ No newline at end of file From a8003ffc8a6d4a9b3acc05dc18ac1181feebefd0 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 22 Mar 2021 18:21:06 +0100 Subject: [PATCH 24/28] Changing scheme from gcs to gs --- openwpm_utils/gcs.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py index 0d7f6d5..9d407a5 100644 --- a/openwpm_utils/gcs.py +++ b/openwpm_utils/gcs.py @@ -7,13 +7,16 @@ import pyspark.sql.functions as F from google.cloud import storage from pyspark.context import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SQLContext, DataFrame +from pandas import DataFrame as PandasDataFrame from openwpm_utils.crawlhistory import get_worst_status_per_visit_id class GCSDataset(object): - def __init__(self, base_dir: str, bucket:Optional[str]="openwpm-data", **kwargs) -> None: + def __init__( + self, base_dir: str, bucket: Optional[str] = "openwpm-data", **kwargs + ) -> None: """Helper class to load OpenWPM datasets from GCS using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -35,8 +38,8 @@ def __init__(self, base_dir: str, bucket:Optional[str]="openwpm-data", **kwargs) self._content_key = f"{base_dir}/content/%s.gz" self._gcsfs = gcsfs.GCSFileSystem(**kwargs) - def read_table(self, table_name, columns=None): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + def read_table(self, table_name: str, columns: List[str]=None) -> PandasDataFrame: + """Read `table_name` from OpenWPM dataset into a pandas dataframe. Parameters ---------- @@ -48,19 +51,20 @@ def read_table(self, table_name, columns=None): return ( pq.ParquetDataset( self._table_location_format_string % table_name, - filesystem= self._gcsfs, + filesystem=self._gcsfs, metadata_nthreads=4, ) .read(use_pandas_metadata=True, columns=columns) .to_pandas() ) - def collect_content(self, content_hash: str, beautify: bool=False) -> Optional[Union[bytes, str]]: + def collect_content( + self, content_hash: str, beautify: bool = False + ) -> Optional[Union[bytes, str]]: """Collect content by directly connecting to GCS via google.cloud.storage""" storage_client = storage.Client() bucket = storage_client.bucket(self._bucket) - blob = bucket.blob(self._content_key % content_hash) content: Union[bytes, str] = blob.download_as_bytes() @@ -71,8 +75,15 @@ def collect_content(self, content_hash: str, beautify: bool=False) -> Optional[U pass return content + class PySparkGCSDataset(GCSDataset): - def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openwpm-data", **kwargs) -> None: + def __init__( + self, + spark_context: SparkContext, + base_dir: str, + bucket: str = "openwpm-data", + **kwargs, + ) -> None: """Helper class to load OpenWPM datasets from GCS using PySpark Parameters @@ -88,7 +99,9 @@ def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openw super().__init__(base_dir, bucket, **kwargs) self._spark_context = spark_context self._sql_context = SQLContext(spark_context) - self._table_location_format_string = f"gcs://{self._table_location_format_string}" + self._table_location_format_string = ( + f"gs://{self._table_location_format_string}" + ) self._incomplete_visit_ids = self.read_table( "incomplete_visits", mode="all" ).select("visit_id") @@ -100,8 +113,8 @@ def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openw ) def read_table( - self, table_name: str, columns: List[str] = None, mode: str = "successful" - ): + self, table_name: str, columns: Optional[List[str]] = None, mode: str = "successful" + ) -> DataFrame: """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -115,7 +128,9 @@ def read_table( Success is determined per visit_id. A visit_id is failed if one of it's commands failed or if it's in the interrupted table """ - table = self._sql_context.read.parquet(self._table_location_format_string % table_name) + table = self._sql_context.read.parquet( + self._table_location_format_string % table_name + ) if columns is not None: table = table.select(columns) if mode == "all": From ba41239035ad27f533f234e473d046cb129badc4 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Tue, 6 Apr 2021 14:10:51 +0200 Subject: [PATCH 25/28] Big mess --- .pre-commit-config.yaml | 19 ++++++++ openwpm_utils/analysis.py | 43 +++++++++-------- openwpm_utils/blocklist.py | 28 ++++++----- openwpm_utils/crawlhistory.py | 87 +++++++++++++++++++---------------- openwpm_utils/database.py | 68 ++++++++++++++------------- openwpm_utils/dataquality.py | 26 ++++++++++- openwpm_utils/gcs.py | 57 +++++++++++------------ openwpm_utils/s3.py | 46 +++++++++--------- setup.cfg | 17 +++---- setup.py | 47 +++++++++---------- tests/test_crawlhistory.py | 28 ++++++----- 11 files changed, 263 insertions(+), 203 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..828c504 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: + - repo: https://github.com/timothycrosley/isort + rev: 5.5.1 + hooks: + - id: isort + - repo: https://github.com/psf/black + rev: 20.8b1 + hooks: + - id: black + language_version: python3 + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.790 + hooks: + - id: mypy + additional_dependencies: [pytest] + # We may need to add more and more dependencies here, as pre-commit + # runs in an environment without our dependencies + + diff --git a/openwpm_utils/analysis.py b/openwpm_utils/analysis.py index 684a804..3aee3c5 100644 --- a/openwpm_utils/analysis.py +++ b/openwpm_utils/analysis.py @@ -1,10 +1,9 @@ import json from datetime import datetime +from domain_utils import get_ps_plus_1 from pandas import read_sql_query -from .domain import get_ps_plus_1 - def get_set_of_script_hosts_from_call_stack(call_stack): """Return the urls of the scripts involved in the call stack.""" @@ -13,8 +12,7 @@ def get_set_of_script_hosts_from_call_stack(call_stack): return "" stack_frames = call_stack.strip().split("\n") for stack_frame in stack_frames: - script_url = stack_frame.rsplit(":", 2)[0].\ - split("@")[-1].split(" line")[0] + script_url = stack_frame.rsplit(":", 2)[0].split("@")[-1].split(" line")[0] script_urls.add(get_host_from_url(script_url)) return ", ".join(script_urls) @@ -77,25 +75,24 @@ def get_script_urls_from_call_stack_as_set(call_stack): return script_urls stack_frames = call_stack.strip().split("\n") for stack_frame in stack_frames: - script_url = stack_frame.rsplit(":", 2)[0].\ - split("@")[-1].split(" line")[0] + script_url = stack_frame.rsplit(":", 2)[0].split("@")[-1].split(" line")[0] script_urls.add(script_url) return script_urls def add_col_bare_script_url(js_df): """Add a col for script URL without scheme, www and query.""" - js_df['bare_script_url'] =\ - js_df['script_url'].map(strip_scheme_www_and_query) + js_df["bare_script_url"] = js_df["script_url"].map(strip_scheme_www_and_query) def add_col_set_of_script_urls_from_call_stack(js_df): - js_df['stack_scripts'] =\ - js_df['call_stack'].map(get_set_of_script_urls_from_call_stack) + js_df["stack_scripts"] = js_df["call_stack"].map( + get_set_of_script_urls_from_call_stack + ) def add_col_unix_timestamp(df): - df['unix_time_stamp'] = df['time_stamp'].map(datetime_from_iso) + df["unix_time_stamp"] = df["time_stamp"].map(datetime_from_iso) def datetime_from_iso(iso_date): @@ -142,43 +139,49 @@ def get_set_cookie(header): def get_responses_from_visits(con, visit_ids): visit_ids_str = "(%s)" % ",".join(str(x) for x in visit_ids) - qry = """SELECT r.id, r.crawl_id, r.visit_id, r.url, + qry = ( + """SELECT r.id, r.crawl_id, r.visit_id, r.url, sv.site_url, sv.first_party, sv.site_rank, r.method, r.referrer, r.headers, r.response_status, r.location, r.time_stamp FROM http_responses as r LEFT JOIN site_visits as sv ON r.visit_id = sv.visit_id - WHERE r.visit_id in %s;""" % visit_ids_str + WHERE r.visit_id in %s;""" + % visit_ids_str + ) return read_sql_query(qry, con) def get_requests_from_visits(con, visit_ids): visit_ids_str = "(%s)" % ",".join(str(x) for x in visit_ids) - qry = """SELECT r.id, r.crawl_id, r.visit_id, r.url, r.top_level_url, + qry = ( + """SELECT r.id, r.crawl_id, r.visit_id, r.url, r.top_level_url, sv.site_url, sv.first_party, sv.site_rank, r.method, r.referrer, r.headers, r.loading_href, r.req_call_stack, r.content_policy_type, r.post_body, r.time_stamp FROM http_requests as r LEFT JOIN site_visits as sv ON r.visit_id = sv.visit_id - WHERE r.visit_id in %s;""" % visit_ids_str + WHERE r.visit_id in %s;""" + % visit_ids_str + ) return read_sql_query(qry, con) def get_set_of_script_ps1s_from_call_stack(script_urls): if len(script_urls): - return ", ".join( - set((get_ps_plus_1(x) or "") for x in script_urls.split(", "))) + return ", ".join(set((get_ps_plus_1(x) or "") for x in script_urls.split(", "))) else: return "" def add_col_set_of_script_ps1s_from_call_stack(js_df): - js_df['stack_script_ps1s'] =\ - js_df['stack_scripts'].map(get_set_of_script_ps1s_from_call_stack) + js_df["stack_script_ps1s"] = js_df["stack_scripts"].map( + get_set_of_script_ps1s_from_call_stack + ) -if __name__ == '__main__': +if __name__ == "__main__": pass diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 3324342..352496e 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -1,12 +1,11 @@ +from typing import Any, List, Optional, Set, Tuple from urllib.parse import urlparse -import domain_utils as du -from typing import List -import requests +import domain_utils as du import pyspark.sql.functions as F -from pyspark.sql.types import * - +import requests from abp_blocklist_parser import BlockListParser +from pyspark.sql.types import ArrayType, StringType # Mapping from # https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType @@ -47,7 +46,7 @@ def get_option_dict(url, top_level_url, resource_type=None): """Build an options dict for BlockListParser. - + These options are checked here: * https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L104-L117 * https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L240-L248 @@ -87,17 +86,21 @@ def get_option_dict(url, top_level_url, resource_type=None): return options -def prepare_get_matching_rules(blockers: List[BlockListParser]): - def get_matching_rules(url, top_level_url, resource_type): +def prepare_get_matching_rules(blockers: List[BlockListParser]) -> Any: + def get_matching_rules( + url: str, top_level_url: str, resource_type: Optional[str] + ) -> Optional[Tuple[Any, ...]]: # skip top-level requests if top_level_url is None: - return + return None - matching_rules = set() + matching_rules: Set[str] = set() options = get_option_dict(url, top_level_url, resource_type) if options is None: - print(f"Something went wrong when handling {url} on top level URL {top_level_url}") - return + print( + f"Something went wrong when handling {url} on top level URL {top_level_url}" + ) + return None for blocker in blockers: result = blocker.should_block_with_items(url, options=options) @@ -106,5 +109,6 @@ def get_matching_rules(url, top_level_url, resource_type): if len(matching_rules) > 0: return tuple(matching_rules) + return None return F.udf(get_matching_rules, ArrayType(StringType())) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 1f8eca1..e1c45c0 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -1,6 +1,7 @@ import json import pyspark.sql.functions as F +from pyspark.sql import DataFrame from pyspark.sql.types import StringType reduce_to_worst_command_status = ( @@ -25,29 +26,33 @@ def get_worst_status_per_visit_id(crawl_history): """Adds column `worst_status`""" - return (crawl_history.groupBy("visit_id") - .agg(F.collect_list("command_status").alias("command_status")) - .withColumn("worst_status",reduce_to_worst_command_status)) - - -def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits): - """ Analyzes crawl_history and interrupted_visits to display general - success statistics grouped by command_sequence - - Parameters - ---------- - crawl_history: dataframe - The full ``crawl_history`` dataframe - interrupted_visits: dataframe - The full ``interrupted_visits`` dataframe - - Examples - -------- - >>> from openwpm_utils.s3 import PySparkS3Dataset - >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) - >>> crawl_history = dataset.read_table('crawl_history', mode="all") - >>> incomplete = dataset.read_table('incomplete_visits', mode="all") - >>> display_crawl_history_per_command_sequence(crawl_history, incomplete) + return ( + crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status", reduce_to_worst_command_status) + ) + + +def display_crawl_history_per_command_sequence( + crawl_history: DataFrame, interrupted_visits: DataFrame +) -> DataFrame: + """Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by command_sequence + + Parameters + ---------- + crawl_history + The full ``crawl_history`` dataframe + interrupted_visits + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_command_sequence(crawl_history, incomplete) """ crawl_history.groupBy("command").count().show() @@ -92,24 +97,26 @@ def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits ) -def display_crawl_history_per_website(crawl_history, interrupted_visits): - """ Analyzes crawl_history and interrupted_visits to display general - success statistics grouped by website +def display_crawl_history_per_website( + crawl_history: DataFrame, interrupted_visits: DataFrame +) -> None: + """Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by website - Parameters - ---------- - crawl_history: dataframe - The full ``crawl_history`` dataframe - interrupted_visits: dataframe - The full ``interrupted_visits`` dataframe + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe - Examples - -------- - >>> from openwpm_utils.s3 import PySparkS3Dataset - >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) - >>> crawl_history = dataset.read_table('crawl_history', mode="all") - >>> incomplete = dataset.read_table('incomplete_visits', mode="all") - >>> display_crawl_history_per_website(crawl_history, incomplete) + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_website(crawl_history, incomplete) """ visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) @@ -134,7 +141,7 @@ def extract_website_from_arguments(arguments): best_status_per_website = ( visit_id_website_status.groupBy("website") .agg(F.collect_list("worst_status").alias("command_status")) - .withColumn("best_status",reduce_to_best_command_status) + .withColumn("best_status", reduce_to_best_command_status) ) total_number_websites = best_status_per_website.count() diff --git a/openwpm_utils/database.py b/openwpm_utils/database.py index edb34fb..87642b2 100644 --- a/openwpm_utils/database.py +++ b/openwpm_utils/database.py @@ -1,6 +1,7 @@ +import zlib + import jsbeautifier import plyvel -import zlib # SQLite @@ -17,12 +18,12 @@ def fetchiter(cursor, arraysize=10000): def list_placeholder(length, is_pg=False): """Returns a (?,?,?,?...) string of the desired length""" - return '(' + '?,'*(length-1) + '?)' + return "(" + "?," * (length - 1) + "?)" def optimize_db(cursor): """Set options to make sqlite more efficient on a high memory machine""" - cursor.execute("PRAGMA cache_size = -%i" % (0.1 * 10**7)) # 10 GB + cursor.execute("PRAGMA cache_size = -%i" % (0.1 * 10 ** 7)) # 10 GB # Store temp tables, indicies in memory cursor.execute("PRAGMA temp_store = 2") @@ -41,16 +42,18 @@ def build_index(cursor, column, tables): # Script content stored in LevelDB databases by content hash -def get_leveldb(db_path, compression='snappy'): +def get_leveldb(db_path, compression="snappy"): """ Returns an open handle for a leveldb database with proper configuration settings. """ - db = plyvel.DB(db_path, - lru_cache_size=10**9, - write_buffer_size=128*10**4, - bloom_filter_bits=128, - compression=compression) + db = plyvel.DB( + db_path, + lru_cache_size=10 ** 9, + write_buffer_size=128 * 10 ** 4, + bloom_filter_bits=128, + compression=compression, + ) return db @@ -69,12 +72,10 @@ def get_url_content(url, sqlite_cur, ldb_con, beautify=True, visit_id=None): visit_id : int (optional) `visit_id` of the page visit where this URL was loaded """ - return get_url_content_with_hash( - url, sqlite_cur, ldb_con, beautify, visit_id)[1] + return get_url_content_with_hash(url, sqlite_cur, ldb_con, beautify, visit_id)[1] -def get_url_content_with_hash(url, sqlite_cur, ldb_con, - beautify=True, visit_id=None): +def get_url_content_with_hash(url, sqlite_cur, ldb_con, beautify=True, visit_id=None): """Return javascript content for given url. Parameters ---------- @@ -92,15 +93,15 @@ def get_url_content_with_hash(url, sqlite_cur, ldb_con, if visit_id is not None: sqlite_cur.execute( "SELECT content_hash FROM http_responses WHERE " - "visit_id = ? AND url = ? LIMIT 1;", (visit_id, url)) + "visit_id = ? AND url = ? LIMIT 1;", + (visit_id, url), + ) else: sqlite_cur.execute( - "SELECT content_hash FROM http_responses WHERE url = ? LIMIT 1;", - (url,)) + "SELECT content_hash FROM http_responses WHERE url = ? LIMIT 1;", (url,) + ) content_hash = sqlite_cur.fetchone() - if (content_hash is None - or len(content_hash) == 0 - or content_hash[0] is None): + if content_hash is None or len(content_hash) == 0 or content_hash[0] is None: return content_hash = content_hash[0] content = get_content(ldb_con, content_hash, beautify=beautify) @@ -109,8 +110,7 @@ def get_url_content_with_hash(url, sqlite_cur, ldb_con, return (content_hash, content) -def get_channel_content(visit_id, channel_id, - sqlite_cur, ldb_con, beautify=True): +def get_channel_content(visit_id, channel_id, sqlite_cur, ldb_con, beautify=True): """Return javascript content for given channel_id. Parameters ---------- @@ -126,11 +126,13 @@ def get_channel_content(visit_id, channel_id, Control weather or not to beautify output """ return get_channel_content_with_hash( - visit_id, channel_id, sqlite_cur, ldb_con, beautify)[1] + visit_id, channel_id, sqlite_cur, ldb_con, beautify + )[1] -def get_channel_content_with_hash(visit_id, channel_id, - sqlite_cur, ldb_con, beautify=True): +def get_channel_content_with_hash( + visit_id, channel_id, sqlite_cur, ldb_con, beautify=True +): """Return javascript content for given channel_id. Parameters ---------- @@ -148,12 +150,10 @@ def get_channel_content_with_hash(visit_id, channel_id, sqlite_cur.execute( "SELECT content_hash FROM http_responses " "WHERE channel_id = ? AND visit_id = ? LIMIT 1;", - (channel_id, visit_id) + (channel_id, visit_id), ) content_hash = sqlite_cur.fetchone() - if (content_hash is None - or len(content_hash) == 0 - or content_hash[0] is None): + if content_hash is None or len(content_hash) == 0 or content_hash[0] is None: return content_hash = content_hash[0] content = get_content(ldb_con, content_hash, beautify=beautify) @@ -162,7 +162,7 @@ def get_channel_content_with_hash(visit_id, channel_id, return (content_hash, content) -def get_content(db, content_hash, compression='snappy', beautify=True): +def get_content(db, content_hash, compression="snappy", beautify=True): """ Returns decompressed content from javascript leveldb database """ if content_hash is None: print("ERROR: content_hash can't be None...") @@ -171,12 +171,14 @@ def get_content(db, content_hash, compression='snappy', beautify=True): if content is None: print("ERROR: content hash: %s NOT FOUND" % content_hash) return - supported = ['snappy', 'none', 'gzip'] + supported = ["snappy", "none", "gzip"] if compression not in supported: - print("Unsupported compression type %s. Only %s " - "are the supported options." % (compression, str(supported))) + print( + "Unsupported compression type %s. Only %s " + "are the supported options." % (compression, str(supported)) + ) return - elif compression == 'gzip': + elif compression == "gzip": try: content = zlib.decompress(content, zlib.MAX_WBITS | 16) except Exception: diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index d4a36f9..f16ac1a 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -1,5 +1,9 @@ -from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when +import pyspark.sql.functions as F from pyspark.mllib.stat import Statistics +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id def count_not_null(c, nan_as_null=False): @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True): "\nNumber of records with visit_id == -1: %d" % df.where(df.visit_id == -1).count() ) + + +class TableFilter: + def __init__(self, incomplete_visits, crawl_history): + self._incomplete_visit_ids = incomplete_visits.select("visit_id") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def clean_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + + def dirty_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py index 9d407a5..ff8af12 100644 --- a/openwpm_utils/gcs.py +++ b/openwpm_utils/gcs.py @@ -1,21 +1,23 @@ -from typing import List, Optional, Union +from typing import Any, Dict, List, Optional, Union import gcsfs -from google.api_core.exceptions import NotFound import jsbeautifier import pyarrow.parquet as pq import pyspark.sql.functions as F from google.cloud import storage -from pyspark.context import SparkContext -from pyspark.sql import SQLContext, DataFrame from pandas import DataFrame as PandasDataFrame +from pyspark.sql import DataFrame +from pyspark.sql.session import SparkSession -from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter class GCSDataset(object): def __init__( - self, base_dir: str, bucket: Optional[str] = "openwpm-data", **kwargs + self, + base_dir: str, + bucket: Optional[str] = "openwpm-data", + **kwargs: Dict[Any, Any], ) -> None: """Helper class to load OpenWPM datasets from GCS using pandas @@ -38,7 +40,7 @@ def __init__( self._content_key = f"{base_dir}/content/%s.gz" self._gcsfs = gcsfs.GCSFileSystem(**kwargs) - def read_table(self, table_name: str, columns: List[str]=None) -> PandasDataFrame: + def read_table(self, table_name: str, columns: List[str] = None) -> PandasDataFrame: """Read `table_name` from OpenWPM dataset into a pandas dataframe. Parameters @@ -79,10 +81,10 @@ def collect_content( class PySparkGCSDataset(GCSDataset): def __init__( self, - spark_context: SparkContext, + spark_session: SparkSession, base_dir: str, bucket: str = "openwpm-data", - **kwargs, + **kwargs: Dict[Any, Any], ) -> None: """Helper class to load OpenWPM datasets from GCS using PySpark @@ -97,23 +99,19 @@ def __init__( The bucket name on GCS. Defaults to `openwpm-data`. """ super().__init__(base_dir, bucket, **kwargs) - self._spark_context = spark_context - self._sql_context = SQLContext(spark_context) + self._spark_session = spark_session self._table_location_format_string = ( f"gs://{self._table_location_format_string}" ) - self._incomplete_visit_ids = self.read_table( - "incomplete_visits", mode="all" - ).select("visit_id") + incomplete_visits = self.read_table("incomplete_visits", mode="all") crawl_history = self.read_table("crawl_history", mode="all") - self._failed_visit_ids = ( - get_worst_status_per_visit_id(crawl_history) - .where(F.col("worst_status") != "ok") - .select("visit_id") - ) + self._filter = TableFilter(incomplete_visits, crawl_history) def read_table( - self, table_name: str, columns: Optional[List[str]] = None, mode: str = "successful" + self, + table_name: str, + columns: Optional[List[str]] = None, + mode: str = "successful", ) -> DataFrame: """Read `table_name` from OpenWPM dataset into a pyspark dataframe. @@ -128,24 +126,23 @@ def read_table( Success is determined per visit_id. A visit_id is failed if one of it's commands failed or if it's in the interrupted table """ - table = self._sql_context.read.parquet( + table = self._spark_session.read.parquet( self._table_location_format_string % table_name ) - if columns is not None: - table = table.select(columns) + if mode == "all": - return table + table = table if mode == "failed": - return table.join(self._failed_visit_ids, "visit_id", how="inner").union( - table.join(self._incomplete_visit_ids, "visit_id", how="inner") - ) + table = self._filter.dirty_table(table) if mode == "successful": - return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( - self._incomplete_visit_ids, "visit_id", how="leftanti" - ) + table = self._filter.clean_table(table) else: raise AssertionError( f"Mode was ${mode}," "allowed modes are 'all', 'failed' and 'successful'" ) + + if columns is not None: + table = table.select(columns) + return table diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index d8ea261..0e42c4b 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -8,13 +8,15 @@ import s3fs from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa -from pyspark.sql import SQLContext +from pyspark import SparkContext +from pyspark.sql import DataFrame, SQLContext from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter -class S3Dataset(object): - def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): +class S3Dataset: + def __init__(self, s3_directory: str, s3_bucket: str = "openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -82,8 +84,14 @@ def collect_content(self, content_hash, beautify=False): pass return content + class PySparkS3Dataset(S3Dataset): - def __init__(self, spark_context, s3_directory: str, s3_bucket:str="openwpm-crawls"): + def __init__( + self, + spark_context: SparkContext, + s3_directory: str, + s3_bucket: str = "openwpm-crawls", + ) -> None: """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -96,23 +104,17 @@ def __init__(self, spark_context, s3_directory: str, s3_bucket:str="openwpm-craw s3_bucket : string, optional The bucket name on S3. Defaults to `openwpm-crawls`. """ - super.__init__(s3_directory, s3_bucket) + super().__init__(s3_directory, s3_bucket) self._spark_context = spark_context self._sql_context = SQLContext(spark_context) self._s3_table_loc = f"s3a://{self._s3_table_loc}" - self._incomplete_visit_ids = self.read_table( - "incomplete_visits", mode="all" - ).select("visit_id") + incomplete_visits = self.read_table("incomplete_visits", mode="all") crawl_history = self.read_table("crawl_history", mode="all") - self._failed_visit_ids = ( - get_worst_status_per_visit_id(crawl_history) - .where(F.col("worst_status") != "ok") - .select("visit_id") - ) + self._filter = TableFilter(incomplete_visits, crawl_history) def read_table( self, table_name: str, columns: List[str] = None, mode: str = "successful" - ): + ) -> DataFrame: """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -127,21 +129,19 @@ def read_table( if one of it's commands failed or if it's in the interrupted table """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if columns is not None: - table = table.select(columns) if mode == "all": - return table + table = table if mode == "failed": - return table.join(self._failed_visit_ids, "visit_id", how="inner").union( - table.join(self._incomplete_visit_ids, "visit_id", how="inner") - ) + table = self._filter.dirty_table(table) if mode == "successful": - return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( - self._incomplete_visit_ids, "visit_id", how="leftanti" - ) + table = self._filter.clean_table(table) else: raise AssertionError( f"Mode was ${mode}," "allowed modes are 'all', 'failed' and 'successful'" ) + + if columns is not None: + table = table.select(columns) + return table diff --git a/setup.cfg b/setup.cfg index 3162fbc..f6dac37 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,9 +1,10 @@ -[aliases] -test=pytest +[tool:isort] +profile = black +default_section = THIRDPARTY +skip = build, dist, venv -[tool:pytest] -addopts = --flake8 -rw -testpaths = tests - -[flake8] -max-line-length = 88 \ No newline at end of file +[mypy] +python_version = 3.9 +warn_unused_configs = True +ignore_missing_imports = True +disallow_incomplete_defs = True diff --git a/setup.py b/setup.py index 9ba38d6..a049e78 100644 --- a/setup.py +++ b/setup.py @@ -1,40 +1,39 @@ from setuptools import setup -with open('requirements.txt') as f: +with open("requirements.txt") as f: requirements = f.read().splitlines() setup( # Meta - author='Steven Englehardt', - author_email='senglehardt@mozilla.com', - description='Tools for parsing crawl data generated by OpenWPM', - name='openwpm-utils', - license='MPL 2.0', - url='https://github.com/mozilla/openwpm-utils', - version='0.3.0', - packages=['openwpm_utils'], - + author="Steven Englehardt", + author_email="senglehardt@mozilla.com", + description="Tools for parsing crawl data generated by OpenWPM", + name="openwpm-utils", + license="MPL 2.0", + url="https://github.com/mozilla/openwpm-utils", + version="0.3.0", + packages=["openwpm_utils"], # Dependencies install_requires=requirements, - setup_requires=['setuptools_scm',], - + setup_requires=[ + "setuptools_scm", + ], # Packaging include_package_data=True, use_scm_version=False, zip_safe=False, - # Classifiers classifiers=[ - 'Development Status :: 3 - Alpha', - 'Environment :: Web Environment :: Mozilla', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Topic :: Internet :: WWW/HTTP', - 'Topic :: Scientific/Engineering :: Information Analysis' + "Development Status :: 3 - Alpha", + "Environment :: Web Environment :: Mozilla", + "Intended Audience :: Developers", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Scientific/Engineering :: Information Analysis", ], ) diff --git a/tests/test_crawlhistory.py b/tests/test_crawlhistory.py index bd10f58..9f8d6f0 100644 --- a/tests/test_crawlhistory.py +++ b/tests/test_crawlhistory.py @@ -1,20 +1,24 @@ -from openwpm_utils.crawlhistory import get_worst_status_per_visit_id, reduce_to_worst_command_status - from collections import namedtuple + import pyspark as spark -srow = namedtuple('simple_row', 'visit_id command_status'.split()) +from openwpm_utils.crawlhistory import ( + get_worst_status_per_visit_id, + reduce_to_worst_command_status, +) + +srow = namedtuple("simple_row", ["visit_id", "command_status"]) data = [ - srow('1', "critical"), - srow('1', "ok"), - srow('2', "ok"), - srow('3', "neterror"), - srow('3', "timeout") + srow("1", "critical"), + srow("1", "ok"), + srow("2", "ok"), + srow("3", "neterror"), + srow("3", "timeout"), ] data2 = [ - srow('1', ["ok", "critical"]), - srow('2', ["ok"]), - srow('3', ["timeout", "neterror"]), + srow("1", ["ok", "critical"]), + srow("2", ["ok"]), + srow("3", ["timeout", "neterror"]), ] test_df = spark.createDataFrame(data) @@ -23,4 +27,4 @@ test_df2.printSchema() -display(get_worst_status_per_visit_id(test_df)) \ No newline at end of file +get_worst_status_per_visit_id(test_df).show() From e1529261fd598eb191350bd38c489a508f4e29ae Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Tue, 6 Apr 2021 14:14:40 +0200 Subject: [PATCH 26/28] Changing base path --- openwpm_utils/gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py index ff8af12..727264c 100644 --- a/openwpm_utils/gcs.py +++ b/openwpm_utils/gcs.py @@ -36,7 +36,7 @@ def __init__( self._kwargs = kwargs self._bucket = bucket self._base_dir = base_dir - self._table_location_format_string = f"{bucket}/{base_dir}/visits/%s" + self._table_location_format_string = f"{bucket}/{base_dir}/%s" self._content_key = f"{base_dir}/content/%s.gz" self._gcsfs = gcsfs.GCSFileSystem(**kwargs) From 7793799a067ad10a0c38bfa14e8278d04c93e69e Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Tue, 6 Apr 2021 16:23:01 +0200 Subject: [PATCH 27/28] More typing --- openwpm_utils/crawlhistory.py | 4 ++-- openwpm_utils/dataquality.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index e1c45c0..29a7635 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -35,7 +35,7 @@ def get_worst_status_per_visit_id(crawl_history): def display_crawl_history_per_command_sequence( crawl_history: DataFrame, interrupted_visits: DataFrame -) -> DataFrame: +) -> None: """Analyzes crawl_history and interrupted_visits to display general success statistics grouped by command_sequence @@ -133,7 +133,7 @@ def extract_website_from_arguments(arguments): F.col("command") == "GetCommand" ).withColumn("website", udf_extract_website_from_arguments("arguments")) - visit_id_to_website = visit_id_to_website[["visit_id", "website"]] + visit_id_to_website = visit_id_to_website.select("visit_id", "website") visit_id_website_status = visit_id_and_worst_status.join( visit_id_to_website, "visit_id" diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index f16ac1a..e7a2ce8 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -60,7 +60,7 @@ def check_df(df, skip_null_check=True): class TableFilter: - def __init__(self, incomplete_visits, crawl_history): + def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None: self._incomplete_visit_ids = incomplete_visits.select("visit_id") self._failed_visit_ids = ( get_worst_status_per_visit_id(crawl_history) From a07b8d43bc4291ff693e530a0b8ffbb38b43b054 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Tue, 6 Apr 2021 18:05:25 +0200 Subject: [PATCH 28/28] Elif in load_table --- openwpm_utils/gcs.py | 7 +++---- openwpm_utils/s3.py | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py index 727264c..6d55fe9 100644 --- a/openwpm_utils/gcs.py +++ b/openwpm_utils/gcs.py @@ -132,14 +132,13 @@ def read_table( if mode == "all": table = table - if mode == "failed": + elif mode == "failed": table = self._filter.dirty_table(table) - if mode == "successful": + elif mode == "successful": table = self._filter.clean_table(table) else: raise AssertionError( - f"Mode was ${mode}," - "allowed modes are 'all', 'failed' and 'successful'" + f"Mode was {mode}," "allowed modes are 'all', 'failed' and 'successful'" ) if columns is not None: diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 0e42c4b..2242a9e 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -131,9 +131,9 @@ def read_table( table = self._sql_context.read.parquet(self._s3_table_loc % table_name) if mode == "all": table = table - if mode == "failed": + elif mode == "failed": table = self._filter.dirty_table(table) - if mode == "successful": + elif mode == "successful": table = self._filter.clean_table(table) else: raise AssertionError(