Class: EventHub

EventHub

Transaction processing in fabric v1.0 is a long operation spanning multiple components (application, endorsing peer, orderer, committing peer) and takes a relatively lengthy period of time (think seconds instead of milliseconds) to complete. As a result the applications must design their handling of the transaction lifecyle in an asynchrous fashion. After the transaction proposal has been successfully endorsed, and before the transaction message has been successfully broadcast to the orderer, the application should register a listener to be notified of the event when the transaction achieves finality, which is when the block containing the transaction gets added to the peer's ledger/blockchain.

Fabric committing peers provides an event stream to publish events to registered listeners. As of v1.0, the only events that get published are Block events. A Block event gets published whenever the committing peer adds a validated block to the ledger. There are three ways to register a listener to get notified:
  • register a "block listener" to get called for every block event on all channels. The listener will be passed a fully decoded Block object. See registerBlockEvent
  • register a "transaction listener" to get called when the specific transaction by id is committed (discovered inside a block event). The listener will be passed the transaction id and the validation code. See registerTxEvent
  • register a "chaincode event listener" to get called when a specific chaincode event has arrived. The listener will be passed the ChaincodeEvent. See registerChaincodeEvent

    The events are ephemeral, such that if a registered listener crashed when the event is published, the listener will miss the event. There are several techniques to compensate for missed events due to client crashes:
  • register block event listeners and record the block numbers received, such that when the next block arrives and its number is not the next in sequence, then the application knows exactly which block events have been missed. It can then use queryBlock to get those missed blocks from the target peer.
  • use a message queue to catch all the block events. With many robust message queue implementations available today, you will be guaranteed to not miss an event. A fabric event listener can be written in any programming language. The following implementations can be used as reference to write the necessary glue code between the fabric event stream and a message queue:
    • Node.js: this class. Source code can be found here
    • Java: part of the Java SDK for Hyperledger Fabric. Source code can be found here
    • Golang: an example event listener client can be found here

  • new EventHub(clientContext)

    Constructs an EventHub object
    Parameters:
    Name Type Description
    clientContext Client An instance of the Client class which has already been initialzed with a userContext.
    Example
    var eh = client.newEventHub();
    eh.setPeerAddr(
    	 'grpcs://localhost:7053',
    	 {
    	   pem: Buffer.from(certdata).toString(),
    	   'ssl-target-name-override': 'peer1']
    	 }
    );
    
    // register the listeners before calling "connect()" so that we can
    // have the error callback ready to process an error in case the
    // connect() call fails
    eh.registerTxEvent(
      transactionId,
    	 (tx, code) => {
    	   eh.unregisterTxEvent(transactionId);
    	   console.log(util.format('Transaction %s has completed', transactionId));
    	 },
    	 (err) => {
    	   eh.unregisterTxEvent(transactionId);
    	   console.log(util.format('Error %s! Transaction listener for %s has been ' +
                    'deregistered with %s', transactionId, err, eh.getPeerAddr()));
    	 }
    );
    
    eh.connect();

    Methods


    connect()

    Establishes a connection with the peer event source. The peer address must be set by calling the setPeerAddr() method before calling this method. The connection will be established asynchronously. If the connection fails to get established, the application will be notified via the error callbacks from the registerXXXEvent() methods. It is recommended that an application always registers at least one event listener with an error callback, by calling any one of the registerBlockEvent, registerTxEvent or registerChaincodeEvent methods, before calling connect().

    disconnect()

    Disconnects the event hub from the peer event source. Will close all event listeners and send an Error object with the message "EventHub has been shutdown" to all listeners that provided an "onError" callback.

    getPeerAddr()

    Return the peer url of this event hub object

    isconnected()

    Is the event hub connected to the event source?
    Returns:
    True if connected to the event source, false otherwise
    Type
    boolean

    registerBlockEvent(onEvent, onError)

    Register a listener to receive all block events from all the channels that the target peer is part of. The listener's "onEvent" callback gets called on the arrival of every block. If the target peer is expected to participate in more than one channel, then care must be taken in the listener's implementation to differentiate blocks from different channels. See the example below on how to accomplish that.

    An error may be thrown by this call if no "onError" callback is provided and this EventHub has noticed that the connection has not been established. However since the connection establishment is running asynchronously, a register call could be made before this EventHub has been notified of the network issue. The best practice would be to provide an "onError" callback to be notified when this EventHub has an issue.
    Parameters:
    Name Type Description
    onEvent function Callback function that takes a single parameter of a Block object
    onError function Optional callback function to be notified when this event hub is shutdown. The shutdown may be caused by a network error or by a call to the "disconnect()" method or a connection error.
    Returns:
    This is the block registration number that must be sed to unregister (see unregisterBlockEvent)
    Type
    int
    Example

    Find out the channel Id of the arriving block

    eh.registerBlockEvent(
      (block) => {
        var first_tx = block.data.data[0]; // get the first transaction
        var header = first_tx.payload.header; // the "header" object contains metadata of the transaction
        var channel_id = header.channel_header.channel_id;
        if ("mychannel" !== channel_id) return;
    
        // do useful processing of the block
      },
      (err) => {
        console.log('Oh snap!');
      }
    );

    registerChaincodeEvent(ccid, eventname, onEvent, onError)

    Register a listener to receive chaincode events.

    An error may be thrown by this call if no "onError" callback is provided and this EventHub has noticed that the connection has not been established. However since the connection establishment is running asynchronously, a register call could be made before this EventHub has been notified of the network issue. The best practice would be to provide an "onError" callback to be notified when this EventHub has an issue.
    Parameters:
    Name Type Description
    ccid string Id of the chaincode of interest
    eventname string The exact name of the chaincode event (must match the name given to the target chaincode's call to stub.SetEvent(name, payload)), or a regex string to match more than one event by this chaincode
    onEvent function callback function for matched events. It gets passed a single parameter which is a ChaincodeEvent object
    onError function Optional callback function to be notified when this event hub is shutdown. The shutdown may be caused by a network error or by a call to the "disconnect()" method or a connection error.
    Returns:
    An object that should be treated as an opaque handle used to unregister (see unregisterChaincodeEvent)
    Type
    Object

    registerTxEvent(txid, onEvent, onError)

    Register a callback function to receive a notification when the transaction by the given id has been committed into a block.

    An error may be thrown by this call if no "onError" callback is provided and this EventHub has noticed that the connection has not been established. However since the connection establishment is running asynchronously, a register call could be made before this EventHub has been notified of the network issue. The best practice would be to provide an "onError" callback to be notified when this EventHub has an issue.
    Parameters:
    Name Type Description
    txid string Transaction id string
    onEvent function Callback function that takes a parameter of type Transaction, and a string parameter which indicates if the transaction is valid (code = 'VALID'), or not (code string indicating the reason for invalid transaction)
    onError function Optional callback function to be notified when this event hub is shutdown. The shutdown may be caused by a network error or by a call to the "disconnect()" method or a connection error.

    setPeerAddr(peeraddr, opts)

    Set peer event source url.
    Parameters:
    Name Type Description
    peeraddr string grpc or grpcs URL for the target peer's event source
    opts ConnectionOpts The options for the connection to the peer.

    unregisterBlockEvent(The)

    Unregister the block event listener using the block registration number that is returned by the call to the registerBlockEvent() method.
    Parameters:
    Name Type Description
    The int block registration number that was returned during registration.

    unregisterChaincodeEvent(listener_handle)

    Unregister the chaincode event listener represented by the listener_handle object returned by the registerChaincodeEvent() method
    Parameters:
    Name Type Description
    listener_handle Object The handle object returned from the call to registerChaincodeEvent.

    unregisterTxEvent(txid)

    Unregister transaction event listener for the transaction id.
    Parameters:
    Name Type Description
    txid string The transaction id