Streaming

Streaming

O SDK expõe dois streams de longa duração:

  • Stream de snapshots — atualizações de algos enviadas pela engine.
  • Stream de market data — atualizações de book e trades por papel.

Ambos são async iterators com reconexão automática, replay de assinaturas e um sentinela StreamReconnected após cada reconexão bem-sucedida, para que o consumidor possa re-inicializar seu estado.

Stream de snapshots

Assine todos os seus algos rodando (ou um subconjunto filtrado) e reaja conforme a engine envia atualizações.

from investflex.ws.stream import StreamReconnected
from investflex.models.snapshot import StrategySnapshot

async with AsyncInvestflexClient.from_env() as client:
    async with client.algos.stream() as stream:
        await stream.subscribe_all()
        async for evt in stream:
            if isinstance(evt, StreamReconnected):
                # A conexão caiu e voltou. Reinicialize se você
                # mantém um cache local.
                continue
            if isinstance(evt, StrategySnapshot):
                print(evt.clord_id, evt.status.name,
                      evt.output.cum_qty_perc, "%")

Se você prefere um dict gerenciado em vez de consumir eventos na mão, veja Snapshots → Live view.

Backpressure

Os construtores de stream aceitam um modo backpressure:

  • "block" (padrão) — consumidores lentos seguram o produtor.
  • "drop_oldest" — limita a fila descartando eventos antigos.
client.algos.stream(queue_size=2048, backpressure="drop_oldest")

Stream de market data

O stream de market data é por canal: você assina um canal book ou trades para um símbolo de cada vez. Assinaturas são contadas por referência, então assinar o mesmo par (canal, símbolo) duas vezes gera apenas uma assinatura upstream.

from investflex.models.marketdata import Book, Trade

async with AsyncInvestflexClient.from_env() as client:
    async with client.marketdata.stream() as stream:
        await stream.subscribe_book("PETR4")
        await stream.subscribe_trades("PETR4")
        async for evt in stream:
            channel = evt.get("channel")
            data = evt.get("data") or {}
            if channel == "book":
                book = Book.model_validate(data)
                if book.bids and book.asks:
                    print("toque", book.bids[0].price, "/", book.asks[0].price)
            elif channel == "trades":
                trade = Trade.model_validate(data)
                print("trade", trade.price, "x", trade.qty)

Para conveniência, MarketDataService.parse(stream) faz a classificação + model_validate e devolve Book, Trade ou Definition tipados:

from investflex.services.marketdata import MarketDataService
from investflex.models.marketdata import Book, Trade

async with AsyncInvestflexClient.from_env() as client:
    async with client.marketdata.stream() as stream:
        await stream.subscribe_book("PETR4")
        await stream.subscribe_trades("PETR4")
        async for evt in MarketDataService.parse(stream):
            if isinstance(evt, Book):
                print("book", len(evt.bids), "/", len(evt.asks))
            elif isinstance(evt, Trade):
                print("trade", evt.price)

Book

Book é a visão MBP agregada de um papel:

Campo Tipo Observações
symbol str
bids list de BookLevel Ordenado do melhor ao pior.
asks list de BookLevel Ordenado do melhor ao pior.
timestamp datetime | None

BookLevel: price, qty, orders.

Cancelando assinatura

await stream.unsubscribe(channel="trades", symbol="PETR4")

Unsubscribes também são contados por referência; a assinatura upstream permanece enquanto outro consumidor a mantiver.

Semântica de reconexão

Ambos os streams reconectam automaticamente em falhas transientes de rede, com backoff exponencial. Após uma reconexão bem-sucedida:

  • Assinaturas pré-existentes são reaplicadas no upstream.
  • Um StreamReconnected(attempt=N) é emitido no fluxo de eventos para você re-inicializar estado local (ex.: listando os algos novamente, ou limpando uma escada de livro).

Se você quer essa re-inicialização tratada automaticamente, o helper live view faz isso de forma transparente.

Limpeza

Streams são async context managers — entrar no async with conecta, sair derruba o WebSocket. Não gerencie a conexão na mão.