From 33bb9a298c1f115aa8e604bce14eb9261e4bdc22 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 9 Apr 2021 12:37:41 +0200 Subject: [PATCH 1/2] Removed collect_content from PySparkS3Dataset Downloading files via the SparkContext was much slower than downloading via boto (which is what S3Dataset does. So now both classes use the same method, as PySparkS3Dataset inherits from S3Dataset --- openwpm_utils/s3.py | 102 +++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 62 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index b533e03..5654ba8 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -8,68 +8,7 @@ from pyarrow.filesystem import S3FSWrapper # noqa from pyspark.sql import SQLContext - -class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): - """Helper class to load OpenWPM datasets from S3 using PySpark - - Parameters - ---------- - spark_context - Spark context. In databricks, this is available via the `sc` - variable. - s3_directory : string - Directory within the S3 bucket in which the dataset is saved. - s3_bucket : string, optional - The bucket name on S3. Defaults to `openwpm-crawls`. - """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory - self._spark_context = spark_context - self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) - - def read_table(self, table_name, columns=None): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. - - Parameters - ---------- - table_name : string - OpenWPM table to read - columns : list of strings - The set of columns to filter the parquet dataset by - """ - table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if columns is not None: - return table.select(columns) - return table - - def read_content(self, content_hash): - """Read the content corresponding to `content_hash`. - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - return self._spark_context.textFile( - self._s3_content_loc % content_hash) - - def collect_content(self, content_hash, beautify=False): - """Collect content for `content_hash` to driver - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - content = ''.join(self.read_content(content_hash).collect()) - if beautify: - return jsbeautifier.beautify(content) - return content - - -class S3Dataset(object): +class S3Dataset: def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): """Helper class to load OpenWPM datasets from S3 using pandas @@ -134,3 +73,42 @@ def collect_content(self, content_hash, beautify=False): except IndexError: pass return content + +class PySparkS3Dataset(S3Dataset): + def __init__(self, spark_context, s3_directory, + s3_bucket='openwpm-crawls'): + """Helper class to load OpenWPM datasets from S3 using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + s3_directory : string + Directory within the S3 bucket in which the dataset is saved. + s3_bucket : string, optional + The bucket name on S3. Defaults to `openwpm-crawls`. + """ + self._s3_bucket = s3_bucket + self._s3_directory = s3_directory + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( + s3_bucket, s3_directory) + self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( + s3_bucket, s3_directory) + + def read_table(self, table_name, columns=None): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + """ + table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + if columns is not None: + return table.select(columns) + return table From cb8a25ff1a1876bc42cc4b21a6abcaec94124cb0 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 11:41:24 +0200 Subject: [PATCH 2/2] Added mode parameter to PySparkS3Dataset This parameter allows for filtering out VisitIds that are part of `incompleted_visits` or that had a command with a command_status other than "ok" since users probably shouldn't consider them for analysis This filtering functionality is extracted into the TableFilter class to be reused by other Datasets. --- openwpm_utils/crawlhistory.py | 28 +++++++++++++ openwpm_utils/dataquality.py | 26 +++++++++++- openwpm_utils/s3.py | 77 ++++++++++++++++++++++++----------- setup.py | 2 +- 4 files changed, 108 insertions(+), 25 deletions(-) create mode 100644 openwpm_utils/crawlhistory.py diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py new file mode 100644 index 0000000..3799f47 --- /dev/null +++ b/openwpm_utils/crawlhistory.py @@ -0,0 +1,28 @@ +import pyspark.sql.functions as F +from pyspark.sql.types import StringType + +reduce_to_worst_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "error"), "error") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "timeout"), "timeout") + .otherwise("ok") + .alias("worst_status") +) + + +reduce_to_best_command_status = ( + F.when(F.array_contains("command_status", "ok"), "ok") + .when(F.array_contains("command_status", "timeout"), "timeout") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "error"), "error") + .otherwise("critical") + .alias("best_status") +) + + +def get_worst_status_per_visit_id(crawl_history): + """Adds column `worst_status`""" + return (crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status",reduce_to_worst_command_status)) diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index d4a36f9..e7a2ce8 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -1,5 +1,9 @@ -from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when +import pyspark.sql.functions as F from pyspark.mllib.stat import Statistics +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id def count_not_null(c, nan_as_null=False): @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True): "\nNumber of records with visit_id == -1: %d" % df.where(df.visit_id == -1).count() ) + + +class TableFilter: + def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None: + self._incomplete_visit_ids = incomplete_visits.select("visit_id") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def clean_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + + def dirty_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 5654ba8..3e8581c 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -1,15 +1,22 @@ import gzip +from typing import List import boto3 import jsbeautifier import pyarrow.parquet as pq +import pyspark.sql.functions as F import s3fs from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa -from pyspark.sql import SQLContext +from pyspark import SparkContext +from pyspark.sql import DataFrame, SQLContext -class S3Dataset: - def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter + + +class S3Dataset(object): + def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -38,30 +45,33 @@ def read_table(self, table_name, columns=None): columns : list of strings The set of columns to filter the parquet dataset by """ - return pq.ParquetDataset( - self._s3_table_loc % table_name, - filesystem=self._s3fs, - metadata_nthreads=4 - ).read(use_pandas_metadata=True, columns=columns).to_pandas() + return ( + pq.ParquetDataset( + self._s3_table_loc % table_name, + filesystem=self._s3fs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") try: obj = s3.get_object( - Bucket=self._s3_bucket, - Key=self._content_key % content_hash + Bucket=self._s3_bucket, Key=self._content_key % content_hash ) body = obj["Body"] compressed_content = body.read() body.close() except ClientError as e: - if e.response['Error']['Code'] != 'NoSuchKey': + if e.response["Error"]["Code"] != "NoSuchKey": raise else: return None - with gzip.GzipFile(fileobj=compressed_content, mode='r') as f: + with gzip.GzipFile(fileobj=compressed_content, mode="r") as f: content = f.read() if content is None or content == "": @@ -74,9 +84,11 @@ def collect_content(self, content_hash, beautify=False): pass return content + class PySparkS3Dataset(S3Dataset): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): + def __init__( + self, spark_context, s3_directory: str, s3_bucket: str = "openwpm-crawls" + ): """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -89,16 +101,17 @@ def __init__(self, spark_context, s3_directory, s3_bucket : string, optional The bucket name on S3. Defaults to `openwpm-crawls`. """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory + super().__init__(s3_directory, s3_bucket) self._spark_context = spark_context self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) + self._s3_table_loc = f"s3a://{self._s3_table_loc}" + incomplete_visits = self.read_table("incomplete_visits", mode="all") + crawl_history = self.read_table("crawl_history", mode="all") + self._filter = TableFilter(incomplete_visits, crawl_history) - def read_table(self, table_name, columns=None): + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -107,8 +120,26 @@ def read_table(self, table_name, columns=None): OpenWPM table to read columns : list of strings The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + + if mode == "all": + table = table + elif mode == "failed": + table = self._filter.dirty_table(table) + elif mode == "successful": + table = self._filter.clean_table(table) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + if columns is not None: - return table.select(columns) + table = table.select(columns) + return table diff --git a/setup.py b/setup.py index 8e7bbee..9ba38d6 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ name='openwpm-utils', license='MPL 2.0', url='https://github.com/mozilla/openwpm-utils', - version='0.2.0', + version='0.3.0', packages=['openwpm_utils'], # Dependencies