« 返回到所有博客帖子

将OPC UA发布端(Publisher)通过MQTT协议连接到Azure

14.07.2021


更新日期: 2021-09-24

Prosys OPC UA SDK for Java实现了OPC UA PubSub(Publisher/Subscriber,即发布端/订阅端)功能,这给了我们一个很好的机会利用这些最新功能来更新我们的Azure演示。我们之前的两篇博文描述了我们在这个项目上完成的工作:第一篇第二篇

在上一次迭代中,我们使用Microsoft OPC publisher模块将数据从OPC UA服务器传输到Azure。因为Azure的IoT Hub也允许用户使用<a href=”https://mqtt.org/”<MQTT</a>协议进行直接连接,因此我们希望用我们自己的PubSub发布端替换该模块。由于我们由Raspberry Pi驱动的气象站日渐陈旧,我们决定将其拆除,并用模拟数据源替换。

注意: 该方案需要Prosys OPC UA SDK for Java 4.6.0版

Azure IoT Hub via MQTT

使用MQTT协议将数据传输到Azure

Azure IoT Hub提供端点,允许IoT设备使用MQTT协议与云通信。尽管IoT Hub不是功能齐全的MQTT代理,但它仍然支持接收OPC UA PubSub消息。连接到IoT Hub的事件允许进一步处理设备到云消息中发送的数据。

OPC UA 发布端 (Publisher)

OPC UA PubSub是一种新的通信机制,被定义为基本OPC UA客户端/服务器通信模式的替代方案。PubSub支持几种不同的替代传输协议。MQTT适合于云通信,并得到大多数云产品的广泛支持。

OPC UA 发布端是一个被配置为向MQTT代理发送固定数据集的应用程序,MQTT代理随后可以将其分发给多个订阅端。

使用Prosys OPC UA SDK for Java设置一个连接到Azure IoT Hub的OPC UA MQTT发布端很容易:SDK包含一个示例发布端,可用于快速入门。只要向发布端提供与IoT Hub中定义的设备相对应的正确凭证,就可以使用由TLS协议保护的MQTT连接发送JSON编码的OPC UA数据。在SamplePublisherServer中,可以通过使用以下命令行参数启动服务器来完成此操作:

>samplepublisherserver.bat 
  --address ssl://<IoT Hub hostname>:8883 
  --encoding json 
  --username <IoT Hub hostname>/<device id>/?api-version=2018-06-30 
  --password <device SAS token> 
  --client-id <device id> 
  --queue-name devices/<device id>/messages/events/ 
  --metadata-queue-name devices/<device id>/messages/events/
  --non-reversible-json

在一行中键入所有内容,在Linux和macOS中,使用samplepublisherserver.sh而不是.bat。用IoT Hub实例的实际主机名替换<IoT Hub hostname>,用Azure中标识的设备id替换<device id><device SAS token>是设备的IoT Hub安全令牌,也被称为设备的 “共享访问签名(SAS)” 。

发布端现在将以标准OPC UA PubSub JSON格式发送消息。默认情况下,示例发布端将包含OPC UA网络报头,可用于过滤掉一些数据,但对于Azure,我们需要使用--non-reversible-json参数,这将使发布端发送普通的name/value对,例如

[{
  "MyLevel":{
    "Value":73.0,
    "SourceTimestamp":"2021-07-07T17:36:29+0300"
  },
  "MyLevelDisplayName":{
    "Value":"MyLevel"
  }
}]

.NET Core事件处理器

通过使用Azure事件中心客户端库 (Azure Event Hubs client library),我们创建了一个带有BackgroundService的.NET核心应用程序,以通过IoT Hub Event Hub兼容端点接收和处理设备消息。

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
		string consumerGroup = "azure";

		// Create a blob container client that the event processor will use 
		BlobContainerClient storageClient =
				new BlobContainerClient(_blobStorageConnectionString, <CONTAINER_NAME>);

		// Create an event processor client to process events in the event hub
		EventProcessorClient processor =    
				new EventProcessorClient(storageClient, consumerGroup, _eventHubConnectionString, <EVENT HUB NAME>);

		// Register handlers for processing events and handling errors
		processor.ProcessEventAsync += ProcessEventHandler;
		processor.ProcessErrorAsync += ProcessErrorHandler;
		_logger.LogInformation("Starting event processor");
		// Start the processing
		await processor.StartProcessingAsync(stoppingToken);

}

接收到的消息被处理成兼容的格式,然后作为POST请求发送到我们的Power BI数据集推送API端点。

async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
		var deviceMessagesJsonString = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());

		List<DeviceMessage> deviceMessages;
		try
		{
				deviceMessages = JsonSerializer.Deserialize<List<DeviceMessage>>(deviceMessagesJsonString);
		}
		catch(Exception e)
		{
				_logger.LogError("failed to deserialize into normal device message");
				return;
		}

		var result = new List<MyLevel>();
		foreach (var message in deviceMessages)
		{
				//Only process recent messages as PowerBi push-datasets have limit of 1 req/second
				if (message.MyLevel != null && eventArgs.Data.EnqueuedTime.AddSeconds(20) > DateTime.UtcNow)
				{
						result.Add(message.MyLevel);
				}
		}

		if (result.Count == 0)
		{
				return;
		}
		var content = new StringContent(
				JsonSerializer.Serialize(result),
				Encoding.UTF8,
				"application/json");

		var client = _clientFactory.CreateClient();
		var res = await client.PostAsync(
				<POWERBI_PUSH_URL>,
				content);
		_logger.LogInformation(res.StatusCode.ToString());
}

我们使用Azure应用程序服务来托管应用程序。有关如何使用Event Hubs客户端库接收和处理事件的完整说明,请参见此处

Power BI

Power BI提供了多种方法,可将数据转换为不同类型的实时图形、仪表盘和报表,这些数据可以嵌入到您自己的应用程序或其他Microsoft服务(如SharePoint)中。由于实时更新的仪表盘无法共享给没有Power BI Pro许可证的用户,我们因此决定在Power BI中生成一个静态图形,该图形可以通过简单的html标记嵌入到网站中。发布到web图形数据大约每小时缓存和更新一次,这意味着在我们的网页上查看时,它不一定提供最新的数据点。由我们的数据生成的图表可以在这里查看。

结论

我们演示了Prosys OPC UA SDK for Java能够将OPC UA数据直接发布到Azure,并从Azure将数据发送到Power BI进行可视化处理。虽然我们演示概括了一些相对基本的层面,但该软件实现可以扩展到更复杂的使用场景。

您想了解更多吗?

如果您有兴趣开发基于Azure或其他云系统的OPC UA连接,请随时与我们联系。我们可以为您提供软件工具和专业服务,在这个快速增长的新市场中实现快速发展。

您可以发送邮件到sales@prosysopc.com 或访问我们的网站联系我们.

Tags: Azure, IoT, Cloud, OPC UA, OPC UA PubSub, MQTT, Java, Power BI

comments powered by Disqus

Prosys OPC Ltd

Prosys OPC是OPC和OPC UA软件领域中拥有20年技术经验的行业佼佼者。 OPC和OPC UA(Unified Architecture)是工业和高科技公司使用的通信标准。

了解更多关于我们的信息 »

最新博客文章

将OPC UA发布端(Publisher)通过MQTT协议连接到Azure

Prosys OPC UA SDK for Java支持使用MQTT协议将OPC UA数据发布到Azure

OPC UA在线课程简介-说明

该博客文章逐步描述了TalentLMS的注册和课程付费过程

OPC UA与构建制药业的信息模型

硕士论文评估OPC UA协议和信息建模,用于严格管制的制造领域

查看所有博客文章 »