Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions openwpm_utils/crawlhistory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

reduce_to_worst_command_status = (
F.when(F.array_contains("command_status", "critical"), "critical")
.when(F.array_contains("command_status", "error"), "error")
.when(F.array_contains("command_status", "neterror"), "neterror")
.when(F.array_contains("command_status", "timeout"), "timeout")
.otherwise("ok")
.alias("worst_status")
)


reduce_to_best_command_status = (
F.when(F.array_contains("command_status", "ok"), "ok")
.when(F.array_contains("command_status", "timeout"), "timeout")
.when(F.array_contains("command_status", "neterror"), "neterror")
.when(F.array_contains("command_status", "error"), "error")
.otherwise("critical")
.alias("best_status")
)


def get_worst_status_per_visit_id(crawl_history):
"""Adds column `worst_status`"""
return (crawl_history.groupBy("visit_id")
.agg(F.collect_list("command_status").alias("command_status"))
.withColumn("worst_status",reduce_to_worst_command_status))
26 changes: 25 additions & 1 deletion openwpm_utils/dataquality.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when
import pyspark.sql.functions as F
from pyspark.mllib.stat import Statistics
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when

from openwpm_utils.crawlhistory import get_worst_status_per_visit_id


def count_not_null(c, nan_as_null=False):
Expand Down Expand Up @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True):
"\nNumber of records with visit_id == -1: %d"
% df.where(df.visit_id == -1).count()
)


class TableFilter:
def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None:
self._incomplete_visit_ids = incomplete_visits.select("visit_id")
self._failed_visit_ids = (
get_worst_status_per_visit_id(crawl_history)
.where(F.col("worst_status") != "ok")
.select("visit_id")
)

def clean_table(self, table: DataFrame) -> DataFrame:
return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join(
self._incomplete_visit_ids, "visit_id", how="leftanti"
)

def dirty_table(self, table: DataFrame) -> DataFrame:
return table.join(self._failed_visit_ids, "visit_id", how="inner").union(
table.join(self._incomplete_visit_ids, "visit_id", how="inner")
)
151 changes: 80 additions & 71 deletions openwpm_utils/s3.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,22 @@
import gzip
from typing import List

import boto3
import jsbeautifier
import pyarrow.parquet as pq
import pyspark.sql.functions as F
import s3fs
from botocore.exceptions import ClientError
from pyarrow.filesystem import S3FSWrapper # noqa
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import DataFrame, SQLContext


class PySparkS3Dataset(object):
def __init__(self, spark_context, s3_directory,
s3_bucket='openwpm-crawls'):
"""Helper class to load OpenWPM datasets from S3 using PySpark

Parameters
----------
spark_context
Spark context. In databricks, this is available via the `sc`
variable.
s3_directory : string
Directory within the S3 bucket in which the dataset is saved.
s3_bucket : string, optional
The bucket name on S3. Defaults to `openwpm-crawls`.
"""
self._s3_bucket = s3_bucket
self._s3_directory = s3_directory
self._spark_context = spark_context
self._sql_context = SQLContext(spark_context)
self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (
s3_bucket, s3_directory)
self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (
s3_bucket, s3_directory)

def read_table(self, table_name, columns=None):
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.

Parameters
----------
table_name : string
OpenWPM table to read
columns : list of strings
The set of columns to filter the parquet dataset by
"""
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)
if columns is not None:
return table.select(columns)
return table

def read_content(self, content_hash):
"""Read the content corresponding to `content_hash`.

NOTE: This can only be run in the driver process since it requires
access to the spark context
"""
return self._spark_context.textFile(
self._s3_content_loc % content_hash)

def collect_content(self, content_hash, beautify=False):
"""Collect content for `content_hash` to driver

NOTE: This can only be run in the driver process since it requires
access to the spark context
"""
content = ''.join(self.read_content(content_hash).collect())
if beautify:
return jsbeautifier.beautify(content)
return content
from openwpm_utils.crawlhistory import get_worst_status_per_visit_id
from openwpm_utils.dataquality import TableFilter


class S3Dataset(object):
def __init__(self, s3_directory, s3_bucket='openwpm-crawls'):
def __init__(self, s3_directory, s3_bucket="openwpm-crawls"):
"""Helper class to load OpenWPM datasets from S3 using pandas

This dataset wrapper is safe to use by spark worker processes, as it
Expand Down Expand Up @@ -99,30 +45,33 @@ def read_table(self, table_name, columns=None):
columns : list of strings
The set of columns to filter the parquet dataset by
"""
return pq.ParquetDataset(
self._s3_table_loc % table_name,
filesystem=self._s3fs,
metadata_nthreads=4
).read(use_pandas_metadata=True, columns=columns).to_pandas()
return (
pq.ParquetDataset(
self._s3_table_loc % table_name,
filesystem=self._s3fs,
metadata_nthreads=4,
)
.read(use_pandas_metadata=True, columns=columns)
.to_pandas()
)

def collect_content(self, content_hash, beautify=False):
"""Collect content by directly connecting to S3 via boto3"""
s3 = boto3.client('s3')
s3 = boto3.client("s3")
try:
obj = s3.get_object(
Bucket=self._s3_bucket,
Key=self._content_key % content_hash
Bucket=self._s3_bucket, Key=self._content_key % content_hash
)
body = obj["Body"]
compressed_content = body.read()
body.close()
except ClientError as e:
if e.response['Error']['Code'] != 'NoSuchKey':
if e.response["Error"]["Code"] != "NoSuchKey":
raise
else:
return None

with gzip.GzipFile(fileobj=compressed_content, mode='r') as f:
with gzip.GzipFile(fileobj=compressed_content, mode="r") as f:
content = f.read()

if content is None or content == "":
Expand All @@ -134,3 +83,63 @@ def collect_content(self, content_hash, beautify=False):
except IndexError:
pass
return content


class PySparkS3Dataset(S3Dataset):
def __init__(
self, spark_context, s3_directory: str, s3_bucket: str = "openwpm-crawls"
):
"""Helper class to load OpenWPM datasets from S3 using PySpark

Parameters
----------
spark_context
Spark context. In databricks, this is available via the `sc`
variable.
s3_directory : string
Directory within the S3 bucket in which the dataset is saved.
s3_bucket : string, optional
The bucket name on S3. Defaults to `openwpm-crawls`.
"""
super().__init__(s3_directory, s3_bucket)
self._spark_context = spark_context
self._sql_context = SQLContext(spark_context)
self._s3_table_loc = f"s3a://{self._s3_table_loc}"
incomplete_visits = self.read_table("incomplete_visits", mode="all")
crawl_history = self.read_table("crawl_history", mode="all")
self._filter = TableFilter(incomplete_visits, crawl_history)

def read_table(
self, table_name: str, columns: List[str] = None, mode: str = "successful"
):
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.

Parameters
----------
table_name : string
OpenWPM table to read
columns : list of strings
The set of columns to filter the parquet dataset by
mode : string
The valid values are "successful", "failed", "all"
Success is determined per visit_id. A visit_id is failed
if one of it's commands failed or if it's in the interrupted table
"""
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)

if mode == "all":
table = table
elif mode == "failed":
table = self._filter.dirty_table(table)
elif mode == "successful":
table = self._filter.clean_table(table)
else:
raise AssertionError(
f"Mode was ${mode},"
"allowed modes are 'all', 'failed' and 'successful'"
)

if columns is not None:
table = table.select(columns)

return table
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
name='openwpm-utils',
license='MPL 2.0',
url='https://github.com/mozilla/openwpm-utils',
version='0.2.0',
version='0.3.0',
packages=['openwpm_utils'],

# Dependencies
Expand Down