Stablev1.8.0
Python SDK
skyaiapp
Official Python SDK with sync/async support, full type annotations, compatible with Python 3.9+.
Installation
pip
poetry
conda
pip install skyaiappRequirements: Python 3.9+
Quick Start
import os
from skyaiapp import SkyAI
# Initialize the client
sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
# Make a routing request
response = sky.route(
goal="cost",
strategy="balanced",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, world!"}
]
)
print(response.choices[0].message.content)
print(f"Model: {response.model}")
print(f"Cost: ${response.routing.cost_usd}")Async Support
import asyncio
from skyaiapp import AsyncSkyAI
async def main():
sky = AsyncSkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
# Async routing
response = await sky.route(
goal="quality",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)
# Concurrent requests
tasks = [
sky.route(goal="cost", messages=[{"role": "user", "content": f"Query {i}"}])
for i in range(5)
]
responses = await asyncio.gather(*tasks)
await sky.close()
asyncio.run(main())Core Features
Streaming
# Synchronous streaming
stream = sky.route(
goal="quality",
messages=[{"role": "user", "content": "Write a story..."}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
# Async streaming
async for chunk in await sky.route(..., stream=True):
print(chunk.choices[0].delta.content, end="")Agent Runtime
from skyaiapp import SkyAI, Tool
sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
# Create an agent with built-in tools
agent = sky.create_agent(
tools=["web_search", "calculator", "code_exec"],
max_steps=10,
sandbox=True
)
# Run a task with step callback
def on_step(step):
print(f"Step {step.number}: {step.action} ({step.duration_ms}ms)")
result = agent.run(
task="Find the current Bitcoin price and calculate 10% of it",
on_step=on_step
)
print(f"Result: {result.output}")
print(f"Total cost: ${result.usage.total_cost_usd}")
# Custom tool with decorator
@sky.tool("get_weather")
def get_weather(location: str) -> dict:
"""Get current weather for a location."""
return {"temp": 72, "condition": "sunny", "location": location}
agent = sky.create_agent(tools=["web_search", get_weather])Guardrails
# PII filtering
response = sky.route(
goal="quality",
messages=[{"role": "user", "content": user_input}],
guardrails={
"pii": {
"enabled": True,
"types": ["email", "phone", "ssn"],
"action": "redact"
}
}
)
# Content moderation
response = sky.route(
goal="quality",
messages=[{"role": "user", "content": user_input}],
guardrails={
"moderation": {
"enabled": True,
"categories": ["hate", "violence"],
"threshold": 0.7
}
}
)
if response.guardrails and response.guardrails.blocked:
print(f"Content blocked: {response.guardrails.reason}")RAG Integration
# Create a knowledge base
kb = sky.knowledge_base.create(
name="product-docs",
description="Product documentation"
)
# Ingest documents
sky.knowledge_base.ingest(
kb.id,
documents=[
{"content": "...", "metadata": {"source": "docs/api.md"}},
{"content": "...", "metadata": {"source": "docs/guide.md"}}
],
chunking={"strategy": "semantic", "max_tokens": 512}
)
# Query with RAG
response = sky.route(
goal="quality",
messages=[{"role": "user", "content": "How do I authenticate?"}],
rag={
"knowledge_base_id": kb.id,
"top_k": 5,
"min_relevance": 0.7
}
)
print(response.choices[0].message.content)
print("Sources:", response.rag.sources)Type Hints
from skyaiapp import SkyAI
from skyaiapp.types import (
RouteRequest,
RouteResponse,
Message,
Goal,
Strategy,
Agent,
AgentRunResult
)
# Full type hints support
request: RouteRequest = {
"goal": "cost",
"strategy": "balanced",
"messages": [
{"role": "system", "content": "..."},
{"role": "user", "content": "..."}
]
}
response: RouteResponse = sky.route(**request)
# IDE autocomplete works perfectly
print(response.choices[0].message.content)
print(response.routing.cost_usd)Error Handling
from skyaiapp.exceptions import (
SkyAIError,
RateLimitError,
AuthenticationError,
ValidationError
)
try:
response = sky.route(...)
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
time.sleep(e.retry_after)
except AuthenticationError:
print("Invalid API key")
except ValidationError as e:
print(f"Invalid request: {e.message}")
except SkyAIError as e:
print(f"API error: {e.code} - {e.message}")
# With automatic retry
sky = SkyAI(
api_key=os.environ["SKYAIAPP_API_KEY"],
max_retries=3,
retry_delay=1.0
)Framework Integration
FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from skyaiapp import AsyncSkyAI
app = FastAPI()
sky = AsyncSkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
class ChatRequest(BaseModel):
messages: list[dict]
@app.post("/api/chat")
async def chat(request: ChatRequest):
response = await sky.route(
goal="cost",
messages=request.messages
)
return response.model_dump()
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
stream = await sky.route(
goal="quality",
messages=request.messages,
stream=True
)
async for chunk in stream:
yield f"data: {chunk.model_dump_json()}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")Django
# views.py
from django.http import JsonResponse, StreamingHttpResponse
from skyaiapp import SkyAI
import json
sky = SkyAI(api_key=settings.SKYAIAPP_API_KEY)
def chat_view(request):
data = json.loads(request.body)
response = sky.route(
goal="cost",
messages=data["messages"]
)
return JsonResponse(response.model_dump())
def chat_stream_view(request):
data = json.loads(request.body)
def event_stream():
stream = sky.route(
goal="quality",
messages=data["messages"],
stream=True
)
for chunk in stream:
yield f"data: {chunk.model_dump_json()}\n\n"
return StreamingHttpResponse(
event_stream(),
content_type="text/event-stream"
)See more examples
Full example projects available on GitHub
Was this page helpful?
Let us know how we can improve