Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion openwpm_utils/blocklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def get_option_dict(url, top_level_url, resource_type=None):
options["third-party"] = True
return options


def prepare_get_matching_rules(blockers: List[BlockListParser]):
def get_matching_rules(url, top_level_url, resource_type):
# skip top-level requests
Expand Down
191 changes: 191 additions & 0 deletions openwpm_utils/crawlhistory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import json

import pyspark.sql.functions as F
from pyspark.sql.types import StringType

reduce_to_worst_command_status = (
F.when(F.array_contains("command_status", "critical"), "critical")
.when(F.array_contains("command_status", "error"), "error")
.when(F.array_contains("command_status", "neterror"), "neterror")
.when(F.array_contains("command_status", "timeout"), "timeout")
.otherwise("ok")
.alias("worst_status")
)


reduce_to_best_command_status = (
F.when(F.array_contains("command_status", "ok"), "ok")
.when(F.array_contains("command_status", "timeout"), "timeout")
.when(F.array_contains("command_status", "neterror"), "neterror")
.when(F.array_contains("command_status", "error"), "error")
.otherwise("critical")
.alias("best_status")
)


def get_worst_status_per_visit_id(crawl_history):
"""Adds column `worst_status`"""
return (crawl_history.groupBy("visit_id")
.agg(F.collect_list("command_status").alias("command_status"))
.withColumn("worst_status",reduce_to_worst_command_status))


def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits):
""" Analyzes crawl_history and interrupted_visits to display general
success statistics grouped by command_sequence

Parameters
----------
crawl_history: dataframe
The full ``crawl_history`` dataframe
interrupted_visits: dataframe
The full ``interrupted_visits`` dataframe

Examples
--------
>>> from openwpm_utils.s3 import PySparkS3Dataset
>>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET)
>>> crawl_history = dataset.read_table('crawl_history', mode="all")
>>> incomplete = dataset.read_table('incomplete_visits', mode="all")
>>> display_crawl_history_per_command_sequence(crawl_history, incomplete)

"""
crawl_history.groupBy("command").count().show()

# Analyzing status per command_sequence
total_num_command_sequences = crawl_history.groupBy("visit_id").count().count()
visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history)
print(
"Percentage of command_sequence that didn't complete successfully %0.2f%%"
% (
visit_id_and_worst_status.where(F.col("worst_status") != "ok").count()
/ float(total_num_command_sequences)
* 100
)
)
net_error_count = visit_id_and_worst_status.where(
F.col("worst_status") == "neterror"
).count()
print(
"There were a total of %d neterrors(%0.2f%% of the all command_sequences)"
% (net_error_count, net_error_count / float(total_num_command_sequences) * 100)
)
timeout_count = visit_id_and_worst_status.where(
F.col("worst_status") == "timeout"
).count()
print(
"There were a total of %d timeouts(%0.2f%% of the all command_sequences)"
% (timeout_count, timeout_count / float(total_num_command_sequences) * 100)
)

error_count = visit_id_and_worst_status.where(
F.col("worst_status") == "error"
).count()
print(
"There were a total of %d errors(%0.2f%% of the all command_sequences)"
% (error_count, error_count / float(total_num_command_sequences) * 100)
)

print(
f"A total of {interrupted_visits.count()} command_sequences were interrupted."
f"This represents {interrupted_visits.count()/ float(total_num_command_sequences)* 100:.2f} % of the entire crawl"
)


def display_crawl_history_per_website(crawl_history, interrupted_visits):
""" Analyzes crawl_history and interrupted_visits to display general
success statistics grouped by website

Parameters
----------
crawl_history: dataframe
The full ``crawl_history`` dataframe
interrupted_visits: dataframe
The full ``interrupted_visits`` dataframe

Examples
--------
>>> from openwpm_utils.s3 import PySparkS3Dataset
>>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET)
>>> crawl_history = dataset.read_table('crawl_history', mode="all")
>>> incomplete = dataset.read_table('incomplete_visits', mode="all")
>>> display_crawl_history_per_website(crawl_history, incomplete)

"""
visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history)

def extract_website_from_arguments(arguments):
"""Given the arguments of a get_command this function returns which website was visited"""
return json.loads(arguments)["url"]

udf_extract_website_from_arguments = F.udf(
extract_website_from_arguments, StringType()
)

visit_id_to_website = crawl_history.where(
F.col("command") == "GetCommand"
).withColumn("website", udf_extract_website_from_arguments("arguments"))

visit_id_to_website = visit_id_to_website[["visit_id", "website"]]

visit_id_website_status = visit_id_and_worst_status.join(
visit_id_to_website, "visit_id"
)
best_status_per_website = (
visit_id_website_status.groupBy("website")
.agg(F.collect_list("worst_status").alias("command_status"))
.withColumn("best_status",reduce_to_best_command_status)
)

total_number_websites = best_status_per_website.count()

print(f"There was an attempt to visit a total of {total_number_websites} websites")

print(
"Percentage of websites that didn't complete successfully %0.2f%%"
% (
best_status_per_website.where(F.col("best_status") != "ok").count()
/ float(total_number_websites)
* 100
)
)
net_error_count = best_status_per_website.where(
F.col("best_status") == "neterror"
).count()
print(
"There were a total of %d neterrors (%0.2f%% of the all websites)"
% (net_error_count, net_error_count / float(total_number_websites) * 100)
)
timeout_count = best_status_per_website.where(
F.col("best_status") == "timeout"
).count()
print(
"There were a total of %d timeouts (%0.2f%% of the all websites)"
% (timeout_count, timeout_count / float(total_number_websites) * 100)
)

error_count = best_status_per_website.where(F.col("best_status") == "error").count()

print(
"There were a total of %d errors (%0.2f%% of the websites)"
% (error_count, error_count / float(total_number_websites) * 100)
)

multiple_successes = (
visit_id_website_status.where(F.col("worst_status") == "ok")
.join(interrupted_visits, "visit_id", how="leftanti")
.groupBy("website")
.count()
.filter("count > 1")
.orderBy(F.desc("count"))
)

print(
f"There were {multiple_successes.count()} websites that were successfully visited multiple times"
)
multiple_successes.groupBy(
F.col("count").alias("Number of successes")
).count().show()

print("A list of all websites that where successfully visited more than twice:")
multiple_successes.filter("count > 2").show()
26 changes: 25 additions & 1 deletion openwpm_utils/dataquality.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when
import pyspark.sql.functions as F
from pyspark.mllib.stat import Statistics
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when

from openwpm_utils.crawlhistory import get_worst_status_per_visit_id


def count_not_null(c, nan_as_null=False):
Expand Down Expand Up @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True):
"\nNumber of records with visit_id == -1: %d"
% df.where(df.visit_id == -1).count()
)


class TableFilter:
def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None:
self._incomplete_visit_ids = incomplete_visits.select("visit_id")
self._failed_visit_ids = (
get_worst_status_per_visit_id(crawl_history)
.where(F.col("worst_status") != "ok")
.select("visit_id")
)

def clean_table(self, table: DataFrame) -> DataFrame:
return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join(
self._incomplete_visit_ids, "visit_id", how="leftanti"
)

def dirty_table(self, table: DataFrame) -> DataFrame:
return table.join(self._failed_visit_ids, "visit_id", how="inner").union(
table.join(self._incomplete_visit_ids, "visit_id", how="inner")
)
151 changes: 151 additions & 0 deletions openwpm_utils/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from typing import List, Optional, Union

import gcsfs
from google.api_core.exceptions import NotFound
import jsbeautifier
import pyarrow.parquet as pq
import pyspark.sql.functions as F
from google.cloud import storage
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, DataFrame
from pandas import DataFrame as PandasDataFrame

from openwpm_utils.crawlhistory import get_worst_status_per_visit_id


class GCSDataset(object):
def __init__(
self, base_dir: str, bucket: Optional[str] = "openwpm-data", **kwargs
) -> None:
"""Helper class to load OpenWPM datasets from GCS using pandas

This dataset wrapper is safe to use by spark worker processes, as it
does not require the spark context.

Parameters
----------
base_dir
Directory within the GCS bucket in which the dataset is saved.
bucket
The bucket name on GCS.
**kwargs
Passed on to GCSFS so you can customize it to your needs
"""
self._kwargs = kwargs
self._bucket = bucket
self._base_dir = base_dir
self._table_location_format_string = f"{bucket}/{base_dir}/visits/%s"
self._content_key = f"{base_dir}/content/%s.gz"
self._gcsfs = gcsfs.GCSFileSystem(**kwargs)

def read_table(self, table_name: str, columns: List[str]=None) -> PandasDataFrame:
"""Read `table_name` from OpenWPM dataset into a pandas dataframe.

Parameters
----------
table_name : string
OpenWPM table to read
columns : list of strings
The set of columns to filter the parquet dataset by
"""
return (
pq.ParquetDataset(
self._table_location_format_string % table_name,
filesystem=self._gcsfs,
metadata_nthreads=4,
)
.read(use_pandas_metadata=True, columns=columns)
.to_pandas()
)

def collect_content(
self, content_hash: str, beautify: bool = False
) -> Optional[Union[bytes, str]]:
"""Collect content by directly connecting to GCS via google.cloud.storage"""
storage_client = storage.Client()
bucket = storage_client.bucket(self._bucket)

blob = bucket.blob(self._content_key % content_hash)
content: Union[bytes, str] = blob.download_as_bytes()

if beautify:
try:
content = jsbeautifier.beautify(content)
except IndexError:
pass
return content


class PySparkGCSDataset(GCSDataset):
def __init__(
self,
spark_context: SparkContext,
base_dir: str,
bucket: str = "openwpm-data",
**kwargs,
) -> None:
"""Helper class to load OpenWPM datasets from GCS using PySpark

Parameters
----------
spark_context
Spark context. In databricks, this is available via the `sc`
variable.
base_dir : string
Directory within the bucket in which the dataset is saved.
bucket : string, optional
The bucket name on GCS. Defaults to `openwpm-data`.
"""
super().__init__(base_dir, bucket, **kwargs)
self._spark_context = spark_context
self._sql_context = SQLContext(spark_context)
self._table_location_format_string = (
f"gs://{self._table_location_format_string}"
)
self._incomplete_visit_ids = self.read_table(
"incomplete_visits", mode="all"
).select("visit_id")
crawl_history = self.read_table("crawl_history", mode="all")
self._failed_visit_ids = (
get_worst_status_per_visit_id(crawl_history)
.where(F.col("worst_status") != "ok")
.select("visit_id")
)

def read_table(
self, table_name: str, columns: Optional[List[str]] = None, mode: str = "successful"
) -> DataFrame:
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.

Parameters
----------
table_name : string
OpenWPM table to read
columns : list of strings
The set of columns to filter the parquet dataset by
mode : string
The valid values are "successful", "failed", "all"
Success is determined per visit_id. A visit_id is failed
if one of it's commands failed or if it's in the interrupted table
"""
table = self._sql_context.read.parquet(
self._table_location_format_string % table_name
)
if columns is not None:
table = table.select(columns)
if mode == "all":
return table
if mode == "failed":
return table.join(self._failed_visit_ids, "visit_id", how="inner").union(
table.join(self._incomplete_visit_ids, "visit_id", how="inner")
)
if mode == "successful":
return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join(
self._incomplete_visit_ids, "visit_id", how="leftanti"
)
else:
raise AssertionError(
f"Mode was ${mode},"
"allowed modes are 'all', 'failed' and 'successful'"
)
return table
Loading