Introduction
Today we’ll learn how to build and deploy a FastAPI application with a task queue via Celery. We will also create a chat interface using Solara and OpenAI. Then, we’ll deploy it as a single full-stack application using the Ploomber Cloud command-line interface.
The full source code can be found here.
What are we building?
The application is a chat bot that will answer questions about GitHub repositories. We’ll use LlamaIndex
which has features for indexing GitHub repositories and chatting with an LLM. This implementation is adapted from an application by Sophia Yang and Mark Scov Madsen..
Architecture
Here’s a diagram to help understand the overall structure of the application:
FastAPI
We first get our basic API running. It will have 3 main endpoints:
/scrape
which will load a repository from Github/status
which will return the loading status of a repository, and/ask
which will answer a user’s question about a repository.
For more detailed information on setting up an API, see the FastAPI docs.
The API should be defined as app.py
and will look something like this:
app = FastAPI()
class Repo(BaseModel):
owner: str
name: str
branch: str
class Question(BaseModel):
repo_id: str
question: str
@app.post('/scrape')
def scrape(repo: Repo) -> dict[str, Any]: # maybe change repo from str
# Enter into DB
id = f"{repo.owner}-{repo.name}-{repo.branch}"
status = None
path = None
return {'id': id, 'status': status, 'path': path}
@app.get('/status/{repo_id}')
def status(repo_id) -> dict[str, Any]:
status = None
return {'id': repo_id, 'status': stat}
@app.post('/ask')
def ask(question: Question) -> dict[str, Any]: # maybe change repo from str
answer = answer_question(question.repo_id, question.question)
return {'question': question, 'answer': answer}
We’ll run the API with this command:
uvicorn app:app --reload
Task Queue with Celery
Now that our API is working, we’ll implement a task queue which will help us process Github repositories in the background. For more in depth information on using FastAPI + Celery, see this excellent tutorial.
What is Celery? How is it useful?
Celery is a task queue with focus on real-time processing, while also supporting task scheduling. But what is a task queue? From Celery’s documentation:
Task queues are used as a mechanism to distribute work across threads or machines. A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.
Using Celery, you can deploy workers which will execute tasks in the background so as to not block your application during long tasks. Later, we will implement a function that indexes Github repositories, which can take up to a few minutes. We’ll use Celery to run that process in the background so that our API can still respond to other requests in the meantime.
Create a task
Create a file called task.py
, which contains a dummy task:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
app = Celery(__name__, broker=redis_url, backend=redis_url)
@app.task
def dummy_task(repo_name):
time.sleep(5)
print(f"Done! {repo_name}")
To call this task from the API, we simply import it and run is as a function followed by .delay
:
@app.post('/scrape')
def scrape(repo: Repo) -> dict[str, Any]:
task.dummy_task.delay(repo.id)
return {"id": repo.id}
Launch a worker
To run Celery you first need an instance of Redis running. In another terminal window, install and run Redis with this command (make sure Docker is running):
pip install redis
docker run --rm --name some-redis -p 6379:6379 redis:latest
Now you can run Celery like this:
celery --app=task.app worker --concurrency=1 --loglevel=DEBUG
For more ease, install watchmedo and run it with live reloading:
pip install watchmedo
watchmedo auto-restart --directory=./ --pattern=task.py -- celery --app=task.app worker --concurrency=1 --loglevel=DEBUG
LlamaIndex
To download and index Github repositories, we use the GithubRepositoryReader
from llama_index
. This will download a Github repository via username, repository name, and branch. It will then create a VectorStoreIndex
which we can pass to the llama_index
LLM to answer user questions.
Ensure you have llama_index
installed:
pip install llama-index-core llama-index-readers-github llama_index
In task.py
, create a new task called download_repo
:
RepoModel.update_repo_status(id, "pending")
db_session.commit()
try:
gh_token = os.getenv("GITHUB_TOKEN")
client = GithubClient(gh_token)
loader = GithubRepositoryReader(
github_client=client,
owner=owner,
repo=name,
use_parser=False,
verbose=True,
timeout=None,
retries=2,
concurrent_requests=1,
)
docs = loader.load_data(branch=branch)
index = VectorStoreIndex.from_documents(docs)
except Exception as er:
RepoModel.update_repo_status(id, "failed")
db_session.commit()
self.update_state(state='FAILURE', meta={'exc': er})
raise
if not INDEXES.exists():
Path.mkdir("indexes", exist_ok=True)
full_path = INDEXES / path
full_path.touch()
with full_path.open("wb") as f:
pickle.dump(index, f, pickle.HIGHEST_PROTOCOL)
RepoModel.update_repo_status(id, "finished")
db_session.commit()
We’ll call it in /scrape
in app.py
:
@app.post('/scrape')
def scrape(repo: Repo) -> dict[str, Any]:
# Enter into DB
id = f"{repo.owner}-{repo.name}-{repo.branch}"
status = None
path = None
# Check if repo has been entered
try:
status = RepoModel.get_repo_status(id)
except:
pass
if status == "pending":
raise HTTPException(status_code=500, detail="Repo is being parsed.")
elif status == "finished":
raise HTTPException(status_code=500, detail="Repo already parsed successfully.")
elif status == "failed":
path = RepoModel.get_repo_path(id)
status = "pending"
else: # status == None
path = f"{repo.owner}_{repo.name}_{repo.branch}.pickle"
status = "pending"
repoModel = RepoModel(id=id, status=status, path=path)
db_session.add(repoModel)
try:
db_session.commit()
except Exception as e:
db_session.rollback()
raise HTTPException(status_code=500, detail="Error submitting to database.") from e
# Send download task to celery
t = task.download_repo.delay(id=id, owner=repo.owner, name=repo.name, branch=repo.branch, path=path)
return {'id': id, 'status': status, 'path': path}
In /ask
, we’ll use the VectorStoreIndex
to answer the user’s questions:
@app.post('/ask')
def ask(question: Question) -> dict[str, Any]: # maybe change repo from str
answer = answer_question(question.repo_id, question.question)
return {'question': question, 'answer': answer}
def answer_question(repo_id, question):
try:
path = INDEXES / RepoModel.get_repo_path(repo_id)
except Exception as e:
raise HTTPException(status_code=500, detail="Repo not found.") from e
if not Path(path).exists():
raise HTTPException(status_code=500, detail="Could not load index")
with open(path, "rb") as f:
index = pickle.load(f)
chat_engine = index.as_chat_engine(chat_mode="context", verbose=True)
response = chat_engine.chat(question)
return response.response
Potential issues
Sometimes the load
function from GithubRepositoryReader
will fail due to an HTTP Error
, JSON parsing error
, or another intermediate error. We’ve ran into a fair amount of issues with this tool, but have found the best results with these parameters:
timeout = None
concurrent_requests = 2
retries = 2
All together, it should look like this:
loader = GithubRepositoryReader(
github_client=client,
owner=owner,
repo=name,
use_parser=False,
verbose=True,
timeout=None,
retries=2,
concurrent_requests=2,
)
Chat interface
We’ll use Solara for the chat interface. It utilizes Sidebar
for the loading functionality and ChatComponents
for the chat. View the source code to get the full picture. Here’s a snippet:
with sl.Column():
sl.Title("Chat with Github")
with sl.Sidebar():
sl.HTML(tag="h1", unsafe_innerHTML="Load a Repository", style={"padding-top": "10px", "padding-bottom": "10px", "padding-left": "10px"})
with sl.Card():
sl.InputText("Owner", value=owner, on_value=set_owner)
sl.InputText("Repo", value=repo, on_value=set_repo)
sl.InputText("Branch", value=branch, on_value=set_branch)
if error_msg:
sl.Error(label=f"{error_msg}", icon=True)
sl.Button("Load", on_click=load_repo, color="primary", style={"width": "100%", "height": "7vh", "margin-bottom": "1vh"})
sl.Button("Clear", on_click=clear, style={"width": "100%", "margin-bottom": "1vh"})
sl.Button("Reset DB", on_click=handle_reset, style={"width": "100%"})
with sl.HBox(align_items="center"):
sl.HTML(tag="h1", unsafe_innerHTML="Repositories", style={"padding-top": "10px", "padding-bottom": "10px", "padding-left": "10px"})
sl.Button("Update", on_click=update_statuses, style={"margin-left": "10px", "padding-right": "20px", "right": "20px", "position": "absolute"})
for this_repo in loaded_repos.value:
with sl.Card():
with sl.Columns([7.5, 2.5]):
sl.Markdown(this_repo.id)
with sl.HBox():
sl.Markdown(f"__{this_repo.status}__", style={"color": status_color(this_repo.status)})
with sl.VBox(classes=["chat-area"]):
with sl.lab.ChatBox(style={"height": "70vh", "padding-bottom": "20px"}):
for message in messages:
with sl.lab.ChatMessage(
user=message.role=="user",
avatar=sl.Image(f"client/static/{message.role}-logo.png", classes=["avatar"]),
avatar_background_color="#ffffff",
name=message.role.capitalize(),
color="#7baded" if message.role == "user" else "rgba(0,0,0, 0.06)",
notch=True,
):
sl.Markdown(message.content)
if num_finished < 1:
sl.Warning("Load a repository to start chatting.")
if chat_error:
sl.Error("Error answering your question. Please refresh.")
sl.lab.ChatInput(send_callback=ask_chatgpt, disabled=disabled)
Deploy
We can deploy the application using Ploomber Cloud. The Ploomber Cloud command-line interface makes it easy to deploy apps and even has features for secrets
and automated deployments
. Since this application will contain a Solara frontend and FastAPI + Celery backend, we’ll need to take some extra measures to get it all running in a single container.
This is the file structure of the app:
api/
folder which contains the backend filesclient/
which contains the frontend files.env
,requirements.txt
,Dockerfile
, andstart.sh
Environment variables
Create a .env
file and make sure to include these environment variables:
OPENAI_API_KEY=your_openai_key
GITHUB_TOKEN=your_github_token
API_ROOT="http://0.0.0.0:8765"
Requirements
To deploy, you need a requirements.txt
file. It should look like this:
uvicorn==0.21.1
fastapi==0.95.0
redis==4.5.4
celery==5.2.7
watchdog
python-dotenv==1.0.0
sqlalchemy
llama-index-core
llama-index-readers-github
llama_index
humanize
solara==1.29.1
openai
Dockerfile
We’ll deploy this as a docker
app, so we need a Dockerfile
:
FROM python:3.11
COPY api/ /api/
COPY client/ /client/
COPY start.sh start.sh
COPY requirements.txt requirements.txt
COPY .env .env
RUN apt-get update && apt-get -y install redis-server
RUN pip install -r requirements.txt --no-cache-dir
RUN python api/create_models.py
ENTRYPOINT ["sh", "start.sh", "--port", "80"]
Some notes:
- We copy over every file, take note of the
api/
andclient/
folders - Before starting, we must create the database using
python api/create_models.py
- Ploomber requires apps to run on
port 80
, so we specify that inENTRYPOINT
ENTRYPOINT
runs a script that starts each process in a specific sequence. More details to follow:
start.sh
We use this shell script to run FastAPI, Redis, Celery, and Solara concurrently. Importantly, the FastAPI server must be running before Solara, or else Solara will fail. To do this, we make use of sleep
. We use the &
operator to run multiple commands at once. It looks like this:
#!/bin/bash
sleep 10 && solara run client/main.py --host=0.0.0.0 --port=80 &
(nohup redis-server &
celery --app=api.task.app worker --concurrency=1 --loglevel=DEBUG &
uvicorn api.app:app --host=0.0.0.0 --port=8765)
Ploomber Cloud
Now we’re ready to deploy. If you haven’t yet sign up and get an API key.
Ensure you’re in the root project directory and run:
ploomber-cloud init
You should see a ploomber_cloud.json
file created. Now deploy:
ploomber-cloud deploy --watch
Troubleshooting
To verify the Docker build works locally, you can run:
docker build -t api-app .
Followed by:
docker run -dp 127.0.0.1:3000:3000 api-app
Using Docker Desktop, it’s easy to verify that your app is running in a single container.
Conclusion
That’s it! In this tutorial we learned how to:
- Build an API with FastAPI
- Implement a task queue with Celery
- Build a chat interface with Solara
- Deploy a full-stack application on Ploomber Cloud
As always, take note of the small details and tips. Happy coding!