Streaming Events of StreamCardano API Primer in Haskell

Streaming Events of StreamCardano API Primer in Haskell

We will show you how to stream events from your StreamCardano.com dynamic API using Haskell HTTP client servant-client!

Preliminaries #

Lets setup the environment variables so our Haskell code does not hold our configuration and secrets:

export STREAMCARDANO_HOST=beta.streamcardano.dev
export STREAMCARDANO_KEY="...your API key here..."

Then lets see all imports required in the code below:

{-# LANGUAGE DataKinds     #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE OverloadedStrings #-}

-- To create the client manager.
import           Network.HTTP.Client.TLS (newTlsManager)

-- Servant imports for client stream.
import           Servant
import           Servant.Client.Streaming
import qualified Servant.Types.SourceT as S

-- To get the application key from the environment, instead of putting it unsafely within the program code.
import           System.Environment

-- Pretty print the Responses.
import Data.Aeson.Encode.Pretty

-- bytestring
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as Char
import qualified Data.ByteString.Lazy as BSL

-- Text
import Data.Text

Now we define the network manager, host and bearer token.

>>> mgr <- newTlsManager
>>> host <- ("https://" <>) <$> getEnv "STREAMCARDANO_HOST"
>>> burl <- parseBaseUrl host
>>> key <- getEnv "STREAMCARDANO_KEY"
>>> let authHeader = "Bearer " <> pack key

Now we define the API and client using servant library.

type DynamicSSEAPI = "api" :> "v1" :> "sse" :> Header "Authorization" Text :> ReqBody '[PlainText] Text :> StreamPost NoFraming OctetStream (SourceIO BS.ByteString)

api :: Proxy DynamicSSEAPI
api = Proxy

dynamicClient :: Client ClientM DynamicSSEAPI
dynamicClient = client api

Query #

Below query is used to get the block info from block, which will be sent to dynamic/sse API.

>>> let query = "SELECT block_no,hash,tx_count from block order by id desc LIMIT 1"

Event Handler #

The function handleEvent takes each Event and prints it

  • On receiving error or end of stream, it will stop listening for events.
  • On receiving event, will pretty print it as Haskell Type.
handleEvent :: S.StepT IO BS.ByteString -> IO ()
handleEvent S.Stop        = putStrLn "End of stream"
handleEvent (S.Error err) = putStrLn $ "Error occurred while streaming events: " <> show err
handleEvent (S.Skip s)    = handleEvent s
handleEvent (S.Effect ms) = ms >>= handleEvent
handleEvent (S.Yield cur s) = (putStrLn $ Char.unpack $ BSL.toStrict $ encodePretty $ Char.unpack cur) *> handleEvent s

Response Handler #

For each response, we unpack the response stream, and apply event handler to the result.


handleResponseStream :: Either ClientError (SourceIO BS.ByteString) -> IO ()
handleResponseStream resp =
  case resp of
    Left err  -> print err
    Right src -> S.unSourceT src handleEvent

We hit the API using the above defined client and response callback.

withClientM (dynamicClient (Just authHeader)  query) (mkClientEnv mgr burl) handleResponseStream

Output #

Response stream of events are printed as below:

"event:new_block\ndata:[{\"block_no\":3929536,\"hash\":\"\\\\xd096cb4bd417516d6642e654bf94ea7181474a7e9d64b9b18ff25b01336cd014\",\"tx_count\":3}]\n\n"
"event:new_block\ndata:[{\"block_no\":3929537,\"hash\":\"\\\\x252f2d662264b583bd85031d7e646caa440a25f95a576679b282b63bef258bef\",\"tx_count\":1}]\n\n"

Pretty print #

Helper #

implemented below helper function to convert the event data into JSON.

convertEventToJSON :: BS.ByteString -> BS.ByteString
convertEventToJSON d =
  "{" <> (BS.intercalate "," (addQuotes <$> Prelude.filter ((/=) "") (BS.split 10 d))) <> "}"
  where
    addQuotes kv =
      let (k: v) = BS.split 58 kv -- split on `:`
      in if k == "event"
            then (Char.pack $ show k) <> ":" <> (Char.pack $ show $ BS.intercalate ":" v)
            else (Char.pack $ show k) <> ":" <> (BS.intercalate ":" v)

Response Stream Transformer #

The below function will convert a ByteString event into Object event using the above transformer.

transformToJSON :: S.StepT IO BS.ByteString -> S.StepT IO Object
transformToJSON (S.Yield cur s) = S.Yield (fromJust (decode $ BSL.fromStrict $ convertEventToJSON cur)) (transformToJSON s)
transformToJSON (S.Effect ms) = S.Effect (transformToJSON <$> ms)
transformToJSON (S.Skip s) = S.Skip (transformToJSON s)
transformToJSON S.Stop = S.Stop
transformToJSON (S.Error a) = S.Error a

Event Handler #

Handler for each event in the stream can be defined as below:

The function handleEvent takes each Event and prints it

  • On receiving error or end of stream, it will stop listening for events.
  • On receiving event, will pretty print it as Haskell Type.
handleEvent :: S.StepT IO Object -> IO ()
handleEvent S.Stop        = putStrLn "End of stream"
handleEvent (S.Error err) = putStrLn $ "Error occurred while streaming events: " <> show err
handleEvent (S.Skip s)    = handleEvent s
handleEvent (S.Effect ms) = ms >>= handleEvent
handleEvent (S.Yield cur s) = (putStrLn $ Char.unpack $ BSL.toStrict $ encodePretty cur) *> handleEvent s

Respone Handler #

Using the above mapper and event handler will rewrite the handleResponse as below.


handleResponsePP :: Either ClientError (SourceIO BS.ByteString) -> IO ()
handleResponsePP resp =
  case resp of
    Left err  -> print err
    Right src ->
      let objStream = S.mapStepT transformToJSON src
        in S.unSourceT objStream handleEvent

Now we can assemble all these together to get and print the response

withClientM (dynamicClient (Just authHeader) query) (mkClientEnv mgr burl) handleResponsePP

Output #

{
    "data": [
        {
            "block_no": 3930459,
            "hash": "\\x8fd95640cfad377839795be890a69c58be66b5bba3831104aa499d2e010c9f6b",
            "tx_count": 1
        }
    ],
    "event": "new_block"
}
{
    "data": [
        {
            "block_no": 3930460,
            "hash": "\\xcc2d0bf9457a493404dac2444eb28baf80cac0fc611d8d395153db1abacac05a",
            "tx_count": 0
        }
    ],
    "event": "new_block"
}

Haskell Types #

The response stream of events can be decoded into Haskell Types as below:

-- required for Deriving Instances for Generic.
{-# LANGUAGE DeriveGeneric #-}

import GHC.Generics


data EventData = EventData
  { event :: Text
  , _data :: [EventInfo] -- since data is a keyword
  }
  deriving (Generic, Show)

-- implemented custom instances to handle the key "data" while serialization/deserialization.
instance FromJSON EventData where
  parseJSON = withObject "EventData" $ \obj ->
    EventData <$> obj .: "event" <*> obj .: "data"

instance ToJSON EventData where
  toJSON (EventData e d) = object ["event" .= e, "data" .= d]

data EventInfo = EventInfo
  { block_no :: Int
  , hash :: Text
  , tx_count :: Int
  }
  deriving (Generic, Show)

instance ToJSON EventInfo
instance FromJSON EventInfo

Response Stream Transformer #

Now we will convert the stream of ByteString events into stream of EventData type.

transformToEventData :: S.StepT IO BS.ByteString -> S.StepT IO EventData
transformToEventData (S.Yield cur s) = S.Yield (fromJust (decode $ BSL.fromStrict $ convertEventToJSON cur)) (transformToEventData s)
transformToEventData (S.Effect ms) = S.Effect (transformToEventData <$> ms)
transformToEventData (S.Skip s) = S.Skip (transformToEventData s)
transformToEventData S.Stop = S.Stop
transformToEventData (S.Error a) = S.Error a

Event Handler #

Handler can be defined for each event of type EventData as below:

The function handleEvent takes each Event and prints it

  • On receiving error or end of stream, it will stop listening for events.
  • On receiving event, will pretty print it as Haskell Type.
handleEvent :: S.StepT IO EventData -> IO ()
handleEvent S.Stop        = putStrLn "End of stream"
handleEvent (S.Error err) = putStrLn $ "Error occurred while streaming events: " <> show err
handleEvent (S.Skip s)    = handleEvent s
handleEvent (S.Effect ms) = ms >>= handleEvent
handleEvent (S.Yield cur s) = (putStrLn $ Char.unpack $ BSL.toStrict $ encodePretty cur) *> handleEvent s

Response Handler #

Using the above transformer, action handler the handleResponse will be implemented as below:


handleResponseCustom :: Either ClientError (SourceIO BS.ByteString) -> IO ()
handleResponseCustom resp =
  case resp of
    Left err  -> print err
    Right src ->
      let objStream = S.mapStepT transformToEventData src
        in S.unSourceT objStream handleEvent

Now we can assemble all these together to get and print the response

withClientM (dynamicClient (Just authHeader) query) (mkClientEnv mgr burl) handleResponsePP

If you want to look at database schema, we are using CardanoDBSync.