Streamware is a modular, component-based architecture for CurLLM inspired by Apache Camel. It provides URI-based routing and composable pipelines for web automation workflows.
Streamware enables you to build powerful web automation pipelines using a declarative, composable syntax:
from curllm_core.streamware import flow
# Simple pipeline
result = (
flow("curllm://browse?url=https://example.com&stealth=true")
| "curllm://extract?instruction=Get all product prices"
| "transform://csv"
| "file://write?path=products.csv"
).run()
Components are reusable processing units that handle specific tasks:
Each component is identified by a URI scheme:
curllm://action?param1=value1¶m2=value2
http://api.example.com/endpoint?method=post
file://write?path=/tmp/output.json
transform://jsonpath?query=$.items[*]
Chain components using the pipe operator (|):
flow("source") | "transform" | "destination"
Scheme: curllm://
Actions:
browse: Navigate and interact with pagesextract: Extract data using LLMfill_form: Fill web formsscreenshot: Take screenshotsbql: Execute BQL queriesexecute: Direct executor callExamples:
# Browse with stealth mode
flow("curllm://browse?url=https://example.com&stealth=true&visual=false")
# Extract data
flow("curllm://extract?url=https://news.ycombinator.com&instruction=Get top 10 stories")
# Fill form
flow("curllm://fill_form?url=https://example.com/contact")
.with_data({"name": "John", "email": "john@example.com"})
# BQL query
flow("curllm://bql").with_data({
"query": 'page(url: "https://example.com") { title, links { text, url } }'
})
Scheme: http://, https://, web://
Methods: GET, POST, PUT, DELETE, PATCH
Examples:
# GET request
flow("http://api.example.com/data")
# POST request
flow("http://api.example.com/users?method=post")
.with_data({"name": "John", "email": "john@example.com"})
# With headers
flow("http://api.example.com/data?header_Authorization=Bearer token")
Scheme: file://
Operations:
read: Read file contentwrite: Write to fileappend: Append to fileexists: Check file existencedelete: Delete fileExamples:
# Read JSON file
flow("file://read?path=/tmp/data.json")
# Write data
flow("file://write?path=/tmp/output.json")
.with_data({"message": "Hello"})
# Append to log
flow("file://append?path=/tmp/app.log")
.with_data("Log entry\n")
Scheme: transform://, jsonpath://, csv://
Types:
json: Parse/serialize JSONjsonpath: Extract using JSONPathcsv: Convert to CSVnormalize: Normalize data structureflatten: Flatten nested objectsExamples:
# Extract with JSONPath
flow("transform://jsonpath?query=$.items[*].name")
# Convert to CSV
flow("transform://csv?delimiter=,")
# Normalize data
flow("transform://normalize")
Process items individually then collect results:
from curllm_core.streamware import split, join
result = (
flow("http://api.example.com/items")
| split("$.items[*]") # Split array
| "curllm://extract?instruction=Get details" # Process each
| join() # Collect results
| "file://write?path=results.json"
).run()
Send data to multiple destinations:
from curllm_core.streamware import multicast
flow("curllm://extract?url=https://example.com&instruction=Get data")
| multicast([
"file://write?path=/tmp/backup.json",
"transform://csv",
"http://api.example.com/webhook?method=post"
])
Route based on conditions:
from curllm_core.streamware import choose
flow("http://api.example.com/events")
| choose()
.when("$.priority == 'high'", "file://write?path=/tmp/high.json")
.when("$.priority == 'low'", "file://write?path=/tmp/low.json")
.otherwise("file://write?path=/tmp/default.json")
Filter data based on conditions:
flow("http://api.example.com/users")
| "filter://condition?field=age&min=18&max=65"
| "file://write?path=/tmp/filtered_users.json"
Process data in streams for memory efficiency:
# Stream multiple pages
urls = [
{"url": "https://example1.com"},
{"url": "https://example2.com"},
{"url": "https://example3.com"},
]
for result in flow("curllm-stream://browse").stream(iter(urls)):
print(f"Processed: {result}")
from curllm_core.streamware import pipeline
result = pipeline(
"curllm://browse?url=https://example.com",
"curllm://extract?instruction=Get data",
"transform://csv",
"file://write?path=output.csv"
).run()
from curllm_core.streamware import metrics
with metrics.track("scraping_pipeline"):
result = flow("curllm://extract?url=...").run()
stats = metrics.get_stats("scraping_pipeline")
print(f"Processed: {stats['processed']}, Errors: {stats['errors']}")
from curllm_core.streamware import batch_process
urls = ["https://example1.com", "https://example2.com", ...]
results = batch_process(urls, "curllm://browse", batch_size=5)
from curllm_core.streamware import enable_diagnostics
enable_diagnostics("DEBUG") # Enable detailed logging
result = (
flow("curllm://browse?url=...")
.with_diagnostics(trace=True) # Per-pipeline diagnostics
.run()
)
from curllm_core.streamware import Component, register
@register("mycustom")
class MyCustomComponent(Component):
input_mime = "application/json"
output_mime = "application/json"
def process(self, data):
# Process data
result = transform_data(data)
return result
# Use it
flow("mycustom://action?param=value")
result = (
flow("curllm://browse?url=https://news.ycombinator.com&stealth=true")
| "curllm://extract?instruction=Get all stories with score > 100"
| "transform://jsonpath?query=$.items[*]"
| split("$[*]")
| "curllm://extract?instruction=Get full article content"
| join()
| "transform://csv"
| "file://write?path=/tmp/articles.csv"
).with_diagnostics().run()
form_data = {
"name": "John Doe",
"email": "john@example.com",
"phone": "+1234567890",
"message": "Contact request"
}
result = (
flow("curllm://fill_form?url=https://example.com/contact&visual=true")
.with_data(form_data)
| "file://write?path=/tmp/form_result.json"
).run()
sites = [
"https://site1.com",
"https://site2.com",
"https://site3.com"
]
for site in sites:
with metrics.track(f"scrape_{site}"):
result = (
flow(f"curllm://extract?url={site}&instruction=Get product listings")
| "transform://normalize"
| f"file://write?path=/tmp/{site.replace('://', '_')}.json"
).run()
result = (
flow("curllm://extract?url=https://data-source.com&instruction=Get data")
| "transform://normalize"
| split("$.items[*]")
| "transform://flatten"
| join()
| "transform://csv"
| "file://write?path=/tmp/etl_output.csv"
| multicast([
"file://write?path=/tmp/backup.csv",
"http://api.example.com/import?method=post"
])
).run()
stealth=trueenable_diagnostics("DEBUG")metrics.track("pipeline_name")Component: Base component classStreamComponent: Base streaming componentTransformComponent: Base transformation componentStreamwareURI: URI parserFlow: Pipeline builderflow(uri): Create pipelinepipeline(*steps): Create multi-step pipelinesplit(pattern, type): Split datajoin(type): Join split datamulticast(destinations): Send to multiple destinationschoose(): Conditional routingenable_diagnostics(level): Enable loggingbatch_process(items, pipeline, batch_size): Batch processing@register(scheme): Register componentList all available components:
from curllm_core.streamware import list_available_components, describe_component
# List all schemes
schemes = list_available_components()
print(schemes) # ['curllm', 'http', 'https', 'file', 'transform', ...]
# Get component details
info = describe_component('curllm')
print(info)
curllm_core/
└── streamware/
├── __init__.py # Main exports
├── core.py # Base component classes
├── uri.py # URI parsing
├── exceptions.py # Custom exceptions
├── registry.py # Component registry
├── flow.py # Flow builder
├── patterns.py # Split, join, multicast, choose
├── helpers.py # Helper utilities
└── components/ # Built-in components
├── __init__.py
├── curllm.py # CurLLM components
├── web.py # HTTP/Web components
├── file.py # File I/O components
└── transform.py # Transform components
# examples/streamware_examples.py contains 15+ examples
python examples/streamware_examples.py
Same as CurLLM (Apache 2.0)