Skip to content

Contact sales

By filling out this form and clicking submit, you acknowledge our privacy policy.

Crossing the streams with Azure Event Hubs and Stream Analytics

Real-time data poses challenges for traditional architectures, like joining streams of data. Here's how to join streams with Azure Event Hubs and Azure Stream Analytics.

Jun 08, 2023 • 12 Minute Read

Please set an alt value for this image...

by Abhishek Gupta
Blog | LinkedIn | Twitter

This blog provides a practical example of how to use Azure Stream Analytics to process streaming data from Azure Event Hubs. You should be able to go through this tutorial using the Azure Portal (or Azure CLI), without writing any code. There are also other resources for exploring stream processing with Azure Stream Analytics at the end of this blog post.

What’s covered?

  • A quick overview of the use case, solution, and its constituents
  • How to setup the required Azure services: Event Hubs, Stream Analytics and Blob storage
  • How to configure and test your Azure Stream Analytics Job with sample data
  • How to run your Azure Stream Analytics Job and test it with real-time data

Want to learn more about Azure certifications?
Check out our Azure Certifications and Learning Paths.

Overview

Azure Stream Analytics is a real-time analytics and complex event-processing engine designed to analyze and process high volumes of fast-streaming data from multiple sources simultaneously. It supports the notion of a Job, each of which consists of an inputquery, and an output. Azure Stream Analytics can ingest data from Azure Event Hubs (including Azure Event Hubs from Apache Kafka), Azure IoT Hub, or Azure Blob Storage. The query, which is based on SQL query language, can be used to easily filter, sort, aggregate, and join streaming data over a period of time.

Assume you have an application that accepts processed orders from customers and sends them to Azure Event Hubs. The requirement is to process the "raw" orders data and enrich it with additional customer info such as name, email, location etc. To get this done, you can build a downstream service that will consume these orders from Event Hubs and process them. In this example, this service happens to be an Azure Stream Analytics job (which we’ll explore later of course!)


You can use Kafka to handle messaging as part of an infrastructure. Want to learn more about producing and consuming messages using simple HTTP requests? Try our hands-on labs to get experience with the requests necessary for consuming Kafka data using REST Proxy.


In order to build this app, we would need to fetch this customer data from an external system (for example, a database) and for each customer ID in the order info, we would query this for the customer details. This will suffice for systems with low-velocity data or where end-to-end processing latency isn’t a concern. But it will pose a challenge for real-time processing on high-velocity streaming data.

Of course, this is not a novel problem! The purpose of this blog post is to showcase how you can use Azure Stream Analytics to implement a solution. Here are the individual components:

Input data source

Azure Stream Analytics jobs connect to one or more data inputs. Each input defines a connection to an existing data source — in this case, its Azure Event Hubs.

An individual order is a JSON payload that looks like this:

{
    "id": "42",
    "custid": "4",
    "amount": "100"
}

Reference data

Customer information is provided as reference data. Although, the customer information is likely to change (e.g., if the customer changes her phone number), for the purposes of this example, we’ll treat it is static reference data stored in Azure Blob Storage container.

Query

This is the workhorse of our solution! It joins (a continuous stream of) orders data from Azure Event Hubs with the static reference customers data based on the matching customer ID (which is id in the customers data set and id in the orders stream)

Output sink

Simply put, an Output lets you store and save the results of the Stream Analytics job. In this example, to keep things simple we continue to use Azure Event Hubs (a different topic) as the output.

Now that you have a conceptual overview, it's time to dive in. All you need is an Azure account. If you don't have it already, just grab one for free.

Initial setup

In this section, you'll:

  • Create Azure Event Hubs namespace and topic
  • Create Azure Blob Storage account and container
  • Create Azure Stream Analytics Job and configure Event Hubs and Blob Storage inputs for the job

Azure Event Hubs

You need to create an Event Hubs Namespace and Hub (topic). There are lots of options including Azure PortalAzure CLIARM template or Azure PowerShell


Azure Resource Manager (ARM) templates provide a powerful way to define Azure resources and configuration using a text file. Use our hands-on labs to learn how to locate and leverage Microsoft’s public Azure quickstart templates.


Please note that you need to create two topics:

  • Input (you can name this orders): Azure Stream Analytics will use this as a (streaming) "source" for orders data
  • Output (you can name this customer-orders): Azure Stream Analytics will use this as a "sink" to store the enriched data as processed by the query

Azure Blob Storage

You’ll need to create an Azure Storage account. This quickstart walks you through this process and provides guidance for Azure Portal, Azure CLI, etc. Once that's done, go ahead and create a container using the Azure Portal or the Azure CLI if you prefer.

Save the JSON below to a file and upload it to the storage container you just created.

[
    {
        "id": "1",
        "name": "Willis Collins",
        "location": "Dallas"
    },
    {
        "id": "2",
        "name": "Casey Brady",
        "location": "Chicago"
    },
    {
        "id": "3",
        "name": "Walker Wong",
        "location": "San Jose"
    },
    {
        "id": "4",
        "name": "Randall Weeks",
        "location": "San Diego"
    },
    {
        "id": "5",
        "name": "Gerardo Dorsey",
        "location": "New Jersey"
    }
]

Azure Stream Analytics

Start by creating an Azure Stream Analytics job. If you want to use the Azure Portal, just follow the steps outlined in this section or use the Azure CLI instead if you don't prefer clicking on a UI.

To configure Azure Event Hubs Input

Open the Azure Stream Analytics job you just created and configure Azure Event Hubs as an Input. Here are some screenshots which should guide you through the steps:

Choose Inputs from the menu on the left

Select + Add stream Input > Event Hub

Enter Event Hubs details — the portal provides you the convenience of choosing from existing Event Hub namespaces and respective Event Hub in your subscription, so all you need to do is choose the right one.

To configure Azure Blob Storage Input:

Choose Inputs from the menu on the left

Select Add reference input > Blob storage

Enter/choose Blob Storage details

Once you're done, you should see the following Inputs:

Configure the query and test with sample data

Azure Stream Analytics allows you to test your streaming queries with sample data. In this section, we’ll upload sample data for orders and customer information for the Event Hubs and Blob Storage inputs respectively.

Upload sample data for orders:

Open the Azure Stream Analytics job, select Query and upload sample orders data for Event Hub input

Save the JSON below to a file and upload it.

[
    {
        "id": "42",
        "custid": "1",
        "amount": "100"
    },
    {
        "id": "43",
        "custid": "2",
        "amount": "200"
    },
    {
        "id": "44",
        "custid": "3",
        "amount": "300"
    },
    {
        "id": "45",
        "custid": "3",
        "amount": "150"
    },
    {
        "id": "46",
        "custid": "4",
        "amount": "170"
    },
    {
        "id": "47",
        "custid": "5",
        "amount": "150"
    },
    {
        "id": "48",
        "custid": "5",
        "amount": "200"
    }

]

Upload sample data for customers

Open the Azure Stream Analytics job, select Query and upload sample orders data for Blob storage input

You can upload the same JSON file that you uploaded to Blob Storage earlier.

Now, configure and run the below query:

SELECT o.id as order_id, o.amount as purchase, o.custid as customer_id, c.name customer_name, c.location as customer_location
FROM orders o
JOIN customers c  
ON o.custid = c.id

Open the Azure Stream Analytics job, select Query and follow the steps as depicted in the screenshot below:

Select Query > enter the query > Test query and don't forget to select Save query

The query JOINs orders data from Event Hubs it with the static reference customers data (from Blob storage) based on the matching customer ID (which is id in the customers data set and id in the orders stream.)

Test the Job with streaming data

It was nice to have the ability to use sample data for testing our streaming solution. Let's go ahead and try this end to end with actual data (orders) flowing into Event Hubs.

An Output is required in order to run a Job. In order to configure the Output, select Output > + Add > Event Hub

Enter Event Hubs details: the portal provides you the convenience of choosing from existing Event Hub namespaces and respective Event Hub in your subscription, so all you need to do is choose the right one.

Start the Job

In the Azure Stream Analytics interface, select Overview, click Start and confirm

Wait for the Job to start, you should see the Status change to Running

Test the end to end flow

To keep things simple, we can use the kafkacat CLI to produce orders and consume enriched events (instead of a program). Just install it and you should be good to go.

Note: Although I have used kafkacat, you're free to choose any other mechanism (CLI or programmatic). This documentation provides lots of examples

Create a kafkacat.conf file with Event Hubs info:

metadata.broker.list=<namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=Endpoint=sb://
    <namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=
     
    
   

Start a consumer to listen from Event Hubs output topic

Let's first start the consumer process that will connect to the output topic (customer-orders) which will get the enriched order information from Azure Stream Analytics

In a terminal:

export KAFKACAT_CONFIG=kafkacat.conf
kafkacat -b <namespace>.servicebus.windows.net:9093 -t customer-orders

//output
% Reading configuration from file kafkacat.conf
% Auto-selecting Consumer mode (use -P or -C to override)
   

This will block, waiting for records from customer-orders.

In another terminal, start sending order info to the orders topic

kafkacat -P -b <namespace>.servicebus.windows.net:9093 -t orders
   

You can send order data via stdout. Simply paste these one at a time and observe the output in the other terminal:

{"id": "22","custid": "1","amount": "100"}
{"id": "23","custid": "2","amount": "200"}
{"id": "24","custid": "3","amount": "300"}
{"id": "25","custid": "4","amount": "400"}
{"id": "26","custid": "15","amount": "500"}

The output you see on the consumer terminal should be similar to this:

...
% Reached end of topic customer-orders [0] at offset 0
{"order_id":"22","purchase":"100","customer_id":"11","customer_name":"Willis Collins","customer_location":"Dallas"}

% Reached end of topic customer-orders [0] at offset 1
{"order_id":"23","purchase":"200","customer_id":"2","customer_name":"Casey Brady","customer_location":"Chicago"
...

Notice how the order info is now enriched with customer data (name, location in this case). You can use the information in this topic anyway you want. For example, you can persist this enriched materialized view to Azure Cosmos DB, trigger an Azure Function, etc.

As expected, you won’t see a corresponding enriched event corresponding to orders placed by customers whose ID isn’t present in the reference customer data (in Blob Storage), since the JOIN criteria is based on the customer ID.

This brings us to the end of this tutorial! I hope it helps you get started with Azure Stream Analytics and test the waters before moving on to more involved use cases.

Where to go next?

In addition to this, there’s plenty of material for you to dig in.

Conclusion

High-velocity, real-time data poses challenges that are hard to deal with using traditional architectures — one such problem is joining these streams of data. Depending on the use case, a custom-built solution might serve you better, but this will take a lot of time and effort to get it right. If possible, you might want to think about extracting parts of your data processing architecture and offloading the heavy lifting to services which are tailor-made for such problems.

In this blog post, we explored a possible solution for implementing streaming joins using a combination of Azure Event Hubs for data ingestion and Azure Stream Analytics for data processing using SQL. These are powerful, off-the-shelf services that you are able to configure and use without setting up any infrastructure, and thanks to the cloud, the underlying complexity of the distributed systems involved in such solutions is completely abstracted from us.

Want to learn more about Azure certifications?
Check out our Azure Certifications and Learning Paths.