33
44with import_functions():
55 from src.functions.rss.pull import rss_pull
6- from src.functions.crawl.website import crawl_website
7- from src.functions.helper.split_text import split_text
6+ from src.functions.crawl.website import crawl_website, CrawlInput
7+ from src.functions.helper.split_text import split_text, SplitTextInput
88 from src.functions.llm.chat import llm_chat, FunctionInputParams
99 from src.functions.rss.schema import RssInput
1010
11+ from pydantic import BaseModel
12+
13+ class RssWorkflowInput(BaseModel):
14+ url: str
15+ count: int
16+
1117@workflow.defn()
1218class RssWorkflow:
1319 @workflow.run
14- async def run(self, input: dict ):
20+ async def run(self, input: RssWorkflowInput ):
1521
16- url = input["url"]
17- count = input["count"]
18- rss_results = await workflow.step(rss_pull, RssInput(url=url, count=count), start_to_close_timeout=timedelta(seconds=10))
22+ url = input.url
23+ count = input.count
24+ rss_results = await workflow.step(
25+ function=rss_pull,
26+ function_input=RssInput(url=url, count=count),
27+ start_to_close_timeout=timedelta(seconds=10))
1928 urls = [item['link'] for item in rss_results if 'link' in item]
2029 titles = [item['title'] for item in rss_results if 'title' in item]
2130
@@ -25,8 +34,14 @@ async def run(self, input: dict):
2534 log.info("rss_result", extra={"url": url})
2635 if url:
2736 try:
28- content = await workflow.step(crawl_website, url, start_to_close_timeout=timedelta(seconds=30))
29- split_content = await workflow.step(split_text, f"{titles[urls.index(url)]}\n\n{content}", start_to_close_timeout=timedelta(seconds=30))
37+ content = await workflow.step(
38+ function=crawl_website,
39+ function_input=CrawlInput(url),
40+ start_to_close_timeout=timedelta(seconds=30))
41+ split_content = await workflow.step(
42+ function=split_text,
43+ function_input=SplitTextInput(text=f"{titles[urls.index(url)]}\n\n{content}"),
44+ start_to_close_timeout=timedelta(seconds=30))
3045 crawled_contents.append(split_content)
3146 except Exception as e:
3247 log.error(f"Failed to crawl {url}: {str(e)}")
@@ -35,12 +50,24 @@ async def run(self, input: dict):
3550 for split_content in crawled_contents:
3651 for content in split_content:
3752 user_prompt = f"Provide a translation of the news article. Translate the following content to English: {content}"
38- translation = await workflow.step(llm_chat, FunctionInputParams(user_prompt=user_prompt), task_queue="llm_chat",start_to_close_timeout=timedelta(seconds=120))
53+ translation = await workflow.step(
54+ function=llm_chat,
55+ function_input=FunctionInputParams(user_prompt=user_prompt),
56+ task_queue="llm_chat",
57+ start_to_close_timeout=timedelta(seconds=120))
3958
4059 user_prompt = f"Provide a summary of the news found on rss feed. Summarize the following content: {translation} in maxium 1 sentence with no more than 20 words"
41- summary = await workflow.step(llm_chat, FunctionInputParams(user_prompt=user_prompt), task_queue="llm_chat",start_to_close_timeout=timedelta(seconds=120))
60+ summary = await workflow.step(
61+ function=llm_chat,
62+ function_input=FunctionInputParams(user_prompt=user_prompt),
63+ task_queue="llm_chat",start_to_close_timeout=timedelta(seconds=120))
4264 summaries.append(summary)
4365
4466 user_prompt = f"Make a daily digest of all the news and tell me what is the most important news. Here are the summaries of the articles: {summaries}."
4567
46- return await workflow.step(llm_chat, FunctionInputParams(user_prompt=user_prompt), task_queue="llm_chat", start_to_close_timeout=timedelta(seconds=120))
68+ return await workflow.step(
69+ function=llm_chat,
70+ funcion_input=FunctionInputParams(user_prompt=user_prompt),
71+ task_queue="llm_chat",
72+ start_to_close_timeout=timedelta(seconds=120)
73+ )
0 commit comments