Source code for houndapi.client.methods.subscribe

from houndapi.schema.subscription import (
    LogsSubscription,
    PendingTransactionsSubscription,
)
from houndapi.schema.responses.subscription import (
    SubscriptionResponse
)
from houndapi.logger import get_logger
from typing import Union, List
import houndapi

[docs] class Subscribe:
[docs] async def subscribe( self: "houndapi.HoundAPI", subscriptions: Union[ Union[LogsSubscription, PendingTransactionsSubscription], List[Union[LogsSubscription, PendingTransactionsSubscription]] ] ) -> Union[SubscriptionResponse, None]: '''Subscribe Args: subscriptions (Union[Union[LogsSubscription, PendingTransactionsSubscription],List[Union[LogsSubscription, PendingTransactionsSubscription]]]]): the Subscriptions Returns: Union[None, :class:`houndapi.schema.responses.subscription.SubscriptionResponse`]''' if not isinstance(subscriptions, list): subscriptions = [subscriptions] try: result = await self.post( endpoint="/api/subscribe", body=[sub.model_dump() for sub in subscriptions] ) return SubscriptionResponse( **result ) except Exception as e: get_logger().info( f"Failed to subscribe ({e})", extra={"endpoint": "/api/subscribe"} )