Streaming
O SDK expõe dois streams de longa duração:
- Stream de snapshots — atualizações de algos enviadas pela engine.
- Stream de market data — atualizações de book e trades por papel.
Ambos são async iterators com reconexão automática, replay de
assinaturas e um sentinela StreamReconnected após cada reconexão
bem-sucedida, para que o consumidor possa re-inicializar seu estado.
Stream de snapshots
Assine todos os seus algos rodando (ou um subconjunto filtrado) e reaja conforme a engine envia atualizações.
from investflex.ws.stream import StreamReconnected
from investflex.models.snapshot import StrategySnapshot
async with AsyncInvestflexClient.from_env() as client:
async with client.algos.stream() as stream:
await stream.subscribe_all()
async for evt in stream:
if isinstance(evt, StreamReconnected):
# A conexão caiu e voltou. Reinicialize se você
# mantém um cache local.
continue
if isinstance(evt, StrategySnapshot):
print(evt.clord_id, evt.status.name,
evt.output.cum_qty_perc, "%")
Se você prefere um dict gerenciado em vez de consumir eventos na mão, veja Snapshots → Live view.
Backpressure
Os construtores de stream aceitam um modo backpressure:
"block"(padrão) — consumidores lentos seguram o produtor."drop_oldest"— limita a fila descartando eventos antigos.
client.algos.stream(queue_size=2048, backpressure="drop_oldest")
Stream de market data
O stream de market data é por canal: você assina um canal book ou
trades para um símbolo de cada vez. Assinaturas são contadas por
referência, então assinar o mesmo par (canal, símbolo) duas vezes
gera apenas uma assinatura upstream.
from investflex.models.marketdata import Book, Trade
async with AsyncInvestflexClient.from_env() as client:
async with client.marketdata.stream() as stream:
await stream.subscribe_book("PETR4")
await stream.subscribe_trades("PETR4")
async for evt in stream:
channel = evt.get("channel")
data = evt.get("data") or {}
if channel == "book":
book = Book.model_validate(data)
if book.bids and book.asks:
print("toque", book.bids[0].price, "/", book.asks[0].price)
elif channel == "trades":
trade = Trade.model_validate(data)
print("trade", trade.price, "x", trade.qty)
Para conveniência, MarketDataService.parse(stream) faz a
classificação + model_validate e devolve Book, Trade ou
Definition tipados:
from investflex.services.marketdata import MarketDataService
from investflex.models.marketdata import Book, Trade
async with AsyncInvestflexClient.from_env() as client:
async with client.marketdata.stream() as stream:
await stream.subscribe_book("PETR4")
await stream.subscribe_trades("PETR4")
async for evt in MarketDataService.parse(stream):
if isinstance(evt, Book):
print("book", len(evt.bids), "/", len(evt.asks))
elif isinstance(evt, Trade):
print("trade", evt.price)
Book
Book é a visão MBP agregada de um papel:
| Campo | Tipo | Observações |
|---|---|---|
symbol |
str | |
bids |
list de BookLevel |
Ordenado do melhor ao pior. |
asks |
list de BookLevel |
Ordenado do melhor ao pior. |
timestamp |
datetime | None |
BookLevel: price, qty, orders.
Cancelando assinatura
await stream.unsubscribe(channel="trades", symbol="PETR4")
Unsubscribes também são contados por referência; a assinatura upstream permanece enquanto outro consumidor a mantiver.
Semântica de reconexão
Ambos os streams reconectam automaticamente em falhas transientes de rede, com backoff exponencial. Após uma reconexão bem-sucedida:
- Assinaturas pré-existentes são reaplicadas no upstream.
- Um
StreamReconnected(attempt=N)é emitido no fluxo de eventos para você re-inicializar estado local (ex.: listando os algos novamente, ou limpando uma escada de livro).
Se você quer essa re-inicialização tratada automaticamente, o helper live view faz isso de forma transparente.
Limpeza
Streams são async context managers — entrar no async with
conecta, sair derruba o WebSocket. Não gerencie a conexão na mão.