diff --git a/zulip/integrations/rss/rss-bot b/zulip/integrations/rss/rss-bot index 49c82fb62..faea7b780 100755 --- a/zulip/integrations/rss/rss-bot +++ b/zulip/integrations/rss/rss-bot @@ -14,7 +14,7 @@ import sys import time import urllib.parse from html.parser import HTMLParser -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List import feedparser from typing_extensions import override @@ -23,7 +23,8 @@ import zulip VERSION = "0.9" RSS_DATA_DIR = os.path.expanduser(os.path.join("~", ".cache", "zulip-rss")) -OLDNESS_THRESHOLD = 30 +EARLIEST_ENTRY_AGE = 30 +MAX_BATCH_SIZE = 100 usage = """Usage: Send summaries of RSS entries for your favorite feeds to Zulip. @@ -92,6 +93,22 @@ parser.add_argument( help="Convert $ to $$ (for KaTeX processing)", default=False, ) +parser.add_argument( + "--max-batch-size", + dest="max_batch_size", + type=int, + help="The maximum number of messages to send at once", + default=MAX_BATCH_SIZE, + action="store", +) +parser.add_argument( + "--earliest-entry-age", + dest="earliest_entry_age", + type=int, + help="The earliest date (relative to today) you want to process entries from (in days)", + default=EARLIEST_ENTRY_AGE, + action="store", +) opts = parser.parse_args() @@ -172,6 +189,11 @@ def elide_subject(subject: str) -> str: return subject +def get_entry_time(entry: Any) -> tuple[float, bool]: + entry_time = entry.get("published_parsed", entry.get("updated_parsed")) + return (calendar.timegm(entry_time), True) if entry_time else (float("-inf"), False) + + def send_zulip(entry: Any, feed_name: str) -> Dict[str, Any]: body: str = entry.summary if opts.unwrap: @@ -206,58 +228,53 @@ client: zulip.Client = zulip.Client( client="ZulipRSS/" + VERSION, ) -first_message = True - for feed_url in feed_urls: - feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc) # Type: str + feed_hashes_file = os.path.join( + opts.data_dir, urllib.parse.urlparse(feed_url).netloc + ) # Type: str try: - with open(feed_file) as f: + with open(feed_hashes_file) as f: old_feed_hashes = {line.strip(): True for line in f.readlines()} except OSError: old_feed_hashes = {} - new_hashes: List[str] = [] + unhashed_entries: List[tuple[Any, str, float]] = [] data = feedparser.parse(feed_url) + feed_name: str = data.feed.title or feed_url + # Safeguard to not process older entries in unordered feeds + entry_threshold = time.time() - opts.earliest_entry_age * 60 * 60 * 24 for entry in data.entries: entry_hash = compute_entry_hash(entry) - # An entry has either been published or updated. - entry_time: Optional[Tuple[int, int]] = entry.get( - "published_parsed", entry.get("updated_parsed") - ) - if ( - entry_time is not None - and time.time() - calendar.timegm(entry_time) > OLDNESS_THRESHOLD * 60 * 60 * 24 - ): - # As a safeguard against misbehaving feeds, don't try to process - # entries older than some threshold. + entry_time, is_time_tagged = get_entry_time(entry) + if (is_time_tagged and entry_time < entry_threshold) or entry_hash in old_feed_hashes: continue - if entry_hash in old_feed_hashes: - # We've already seen this. No need to process any older entries. - break - if not old_feed_hashes and len(new_hashes) >= 3: - # On a first run, pick up the 3 most recent entries. An RSS feed has - # entries in reverse chronological order. - break - - feed_name: str = data.feed.title or feed_url - - response: Dict[str, Any] = send_zulip(entry, feed_name) - if response["result"] != "success": - logger.error("Error processing %s", feed_url) - logger.error("%s", response) - if first_message: - # This is probably some fundamental problem like the stream not - # existing or something being misconfigured, so bail instead of - # getting the same error for every RSS entry. - log_error_and_exit("Failed to process first message") - # Go ahead and move on -- perhaps this entry is corrupt. - new_hashes.append(entry_hash) - first_message = False - - with open(feed_file, "a") as f: - for hash in new_hashes: - f.write(hash + "\n") - - logger.info("Sent zulips for %d %s entries", len(new_hashes), feed_url) + unhashed_entries.append((entry, entry_hash, entry_time)) + + # We process all entries to support unordered feeds, + # but post only the latest ones in chronological order. + sorted_entries = sorted(unhashed_entries, key=lambda x: x[2])[-opts.max_batch_size :] + + with open(feed_hashes_file, "a") as f: + for entry_tuple in sorted_entries: + entry, entry_hash, _ = entry_tuple + + response: Dict[str, Any] = send_zulip(entry, feed_name) + if response["result"] != "success": + logger.error("Error processing %s", feed_url) + logger.error("%s", response) + if not old_feed_hashes and entry_tuple == sorted_entries[0]: + # This is probably some fundamental problem like the stream not + # existing or something being misconfigured, so bail instead of + # getting the same error for every RSS entry. + log_error_and_exit("Failed to process first message") + # Go ahead and move on -- perhaps this entry is corrupt. + f.write(entry_hash + "\n") + + logger.info( + "Processed %d entries from %s and sent %d zulips", + len(unhashed_entries), + feed_url, + len(sorted_entries), + )