We will show you how to stream events from your StreamCardano.com dynamic API using Haskell HTTP client servant-client!
Lets setup the environment variables so our Haskell code does not hold our configuration and secrets:
export STREAMCARDANO_HOST=beta.streamcardano.dev
export STREAMCARDANO_KEY="...your API key here..."
Then lets see all imports required in the code below:
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE OverloadedStrings #-}
-- To create the client manager.
import Network.HTTP.Client.TLS (newTlsManager)
-- Servant imports for client stream.
import Servant
import Servant.Client.Streaming
import qualified Servant.Types.SourceT as S
-- To get the application key from the environment, instead of putting it unsafely within the program code.
import System.Environment
-- Pretty print the Responses.
import Data.Aeson.Encode.Pretty
-- bytestring
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as Char
import qualified Data.ByteString.Lazy as BSL
-- Text
import Data.Text
Now we define the network manager, host and bearer token.
>>> mgr <- newTlsManager
>>> host <- ("https://" <>) <$> getEnv "STREAMCARDANO_HOST"
>>> burl <- parseBaseUrl host
>>> key <- getEnv "STREAMCARDANO_KEY"
>>> let authHeader = "Bearer " <> pack key
Now we define the API and client using servant library.
type DynamicSSEAPI = "api" :> "v1" :> "sse" :> Header "Authorization" Text :> ReqBody '[PlainText] Text :> StreamPost NoFraming OctetStream (SourceIO BS.ByteString)
api :: Proxy DynamicSSEAPI
api = Proxy
dynamicClient :: Client ClientM DynamicSSEAPI
dynamicClient = client api
Below query is used to get the block info from block, which will be sent to dynamic/sse API.
>>> let query = "SELECT block_no,hash,tx_count from block order by id desc LIMIT 1"
The function handleEvent
takes each Event
and prints it
handleEvent :: S.StepT IO BS.ByteString -> IO ()
handleEvent S.Stop = putStrLn "End of stream"
handleEvent (S.Error err) = putStrLn $ "Error occurred while streaming events: " <> show err
handleEvent (S.Skip s) = handleEvent s
handleEvent (S.Effect ms) = ms >>= handleEvent
handleEvent (S.Yield cur s) = (putStrLn $ Char.unpack $ BSL.toStrict $ encodePretty $ Char.unpack cur) *> handleEvent s
For each response, we unpack the response stream, and apply event handler to the result.
handleResponseStream :: Either ClientError (SourceIO BS.ByteString) -> IO ()
handleResponseStream resp =
case resp of
Left err -> print err
Right src -> S.unSourceT src handleEvent
We hit the API using the above defined client and response callback.
withClientM (dynamicClient (Just authHeader) query) (mkClientEnv mgr burl) handleResponseStream
Response stream of events are printed as below:
"event:new_block\ndata:[{\"block_no\":3929536,\"hash\":\"\\\\xd096cb4bd417516d6642e654bf94ea7181474a7e9d64b9b18ff25b01336cd014\",\"tx_count\":3}]\n\n"
"event:new_block\ndata:[{\"block_no\":3929537,\"hash\":\"\\\\x252f2d662264b583bd85031d7e646caa440a25f95a576679b282b63bef258bef\",\"tx_count\":1}]\n\n"
implemented below helper function to convert the event data into JSON.
convertEventToJSON :: BS.ByteString -> BS.ByteString
convertEventToJSON d =
"{" <> (BS.intercalate "," (addQuotes <$> Prelude.filter ((/=) "") (BS.split 10 d))) <> "}"
where
addQuotes kv =
let (k: v) = BS.split 58 kv -- split on `:`
in if k == "event"
then (Char.pack $ show k) <> ":" <> (Char.pack $ show $ BS.intercalate ":" v)
else (Char.pack $ show k) <> ":" <> (BS.intercalate ":" v)
The below function will convert a ByteString
event into Object
event using the above transformer.
transformToJSON :: S.StepT IO BS.ByteString -> S.StepT IO Object
transformToJSON (S.Yield cur s) = S.Yield (fromJust (decode $ BSL.fromStrict $ convertEventToJSON cur)) (transformToJSON s)
transformToJSON (S.Effect ms) = S.Effect (transformToJSON <$> ms)
transformToJSON (S.Skip s) = S.Skip (transformToJSON s)
transformToJSON S.Stop = S.Stop
transformToJSON (S.Error a) = S.Error a
Handler for each event in the stream can be defined as below:
The function handleEvent
takes each Event
and prints it
handleEvent :: S.StepT IO Object -> IO ()
handleEvent S.Stop = putStrLn "End of stream"
handleEvent (S.Error err) = putStrLn $ "Error occurred while streaming events: " <> show err
handleEvent (S.Skip s) = handleEvent s
handleEvent (S.Effect ms) = ms >>= handleEvent
handleEvent (S.Yield cur s) = (putStrLn $ Char.unpack $ BSL.toStrict $ encodePretty cur) *> handleEvent s
Using the above mapper and event handler will rewrite the handleResponse as below.
handleResponsePP :: Either ClientError (SourceIO BS.ByteString) -> IO ()
handleResponsePP resp =
case resp of
Left err -> print err
Right src ->
let objStream = S.mapStepT transformToJSON src
in S.unSourceT objStream handleEvent
Now we can assemble all these together to get and print the response
withClientM (dynamicClient (Just authHeader) query) (mkClientEnv mgr burl) handleResponsePP
{
"data": [
{
"block_no": 3930459,
"hash": "\\x8fd95640cfad377839795be890a69c58be66b5bba3831104aa499d2e010c9f6b",
"tx_count": 1
}
],
"event": "new_block"
}
{
"data": [
{
"block_no": 3930460,
"hash": "\\xcc2d0bf9457a493404dac2444eb28baf80cac0fc611d8d395153db1abacac05a",
"tx_count": 0
}
],
"event": "new_block"
}
The response stream of events can be decoded into Haskell Types as below:
-- required for Deriving Instances for Generic.
{-# LANGUAGE DeriveGeneric #-}
import GHC.Generics
data EventData = EventData
{ event :: Text
, _data :: [EventInfo] -- since data is a keyword
}
deriving (Generic, Show)
-- implemented custom instances to handle the key "data" while serialization/deserialization.
instance FromJSON EventData where
parseJSON = withObject "EventData" $ \obj ->
EventData <$> obj .: "event" <*> obj .: "data"
instance ToJSON EventData where
toJSON (EventData e d) = object ["event" .= e, "data" .= d]
data EventInfo = EventInfo
{ block_no :: Int
, hash :: Text
, tx_count :: Int
}
deriving (Generic, Show)
instance ToJSON EventInfo
instance FromJSON EventInfo
Now we will convert the stream of ByteString
events into stream of EventData
type.
transformToEventData :: S.StepT IO BS.ByteString -> S.StepT IO EventData
transformToEventData (S.Yield cur s) = S.Yield (fromJust (decode $ BSL.fromStrict $ convertEventToJSON cur)) (transformToEventData s)
transformToEventData (S.Effect ms) = S.Effect (transformToEventData <$> ms)
transformToEventData (S.Skip s) = S.Skip (transformToEventData s)
transformToEventData S.Stop = S.Stop
transformToEventData (S.Error a) = S.Error a
Handler can be defined for each event of type EventData
as below:
The function handleEvent
takes each Event
and prints it
handleEvent :: S.StepT IO EventData -> IO ()
handleEvent S.Stop = putStrLn "End of stream"
handleEvent (S.Error err) = putStrLn $ "Error occurred while streaming events: " <> show err
handleEvent (S.Skip s) = handleEvent s
handleEvent (S.Effect ms) = ms >>= handleEvent
handleEvent (S.Yield cur s) = (putStrLn $ Char.unpack $ BSL.toStrict $ encodePretty cur) *> handleEvent s
Using the above transformer, action handler the handleResponse will be implemented as below:
handleResponseCustom :: Either ClientError (SourceIO BS.ByteString) -> IO ()
handleResponseCustom resp =
case resp of
Left err -> print err
Right src ->
let objStream = S.mapStepT transformToEventData src
in S.unSourceT objStream handleEvent
Now we can assemble all these together to get and print the response
withClientM (dynamicClient (Just authHeader) query) (mkClientEnv mgr burl) handleResponsePP
If you want to look at database schema, we are using CardanoDBSync.