下一步, 计算一些后续要用到的值 。注意下面代码中的 TOPIC,因为 PUB 套接字发送的消息需要绑定一个主题 。主题用于供接收者过滤消息:
const size_t topic_size = strlen(TOPIC);const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);printf("Topic: %s; topic size: %zu; Envelope size: %zun", TOPIC, topic_size, envelope_size);发送消息
启动一个发送消息的循环,循环 REPETITIONS 次:
for (unsigned int i = 0; i < REPETITIONS; i++){ ...发送消息前,先填充一个长度为 PACKET_SIZE 的缓冲区 。本库提供的是 16 个位的有符号整数 。因为 C 语言中 int 类型占用空间大小与平台相关,不是确定的值,所以要使用指定宽度的 int 变量:
int16_t buffer[PACKET_SIZE];for (unsigned int j = 0; j < PACKET_SIZE; j++){ buffer[j] = fancyhw_read_val();}printf("Read %u data valuesn", PACKET_SIZE);消息的准备和发送的第一步是创建 ZeroMQ 消息,为消息分配必要的内存空间 。空白的消息是用于封装要发送的数据的:
zmq_msg_t envelope;const int rmi = zmq_msg_init_size(&envelope, envelope_size);if (rmi != 0){ printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %sn", zmq_strerror(errno)); zmq_msg_close(&envelope); break;}现在内存空间已分配,数据保存在 ZeroMQ 消息 “信封”中 。函数 zmq_msg_data() 返回一个指向封装数据缓存区顶端的指针 。第一部分是主题,之后是一个空格,最后是二进制数 。主题和二进制数据之间的分隔符采用空格字符 。需要遍历缓存区的话,使用类型转换和 指针算法。(感谢 C 语言,让事情变得直截了当 。)做法如下:
【使用 ZeroMQ 消息库在 C 和 Python 间共享数据】memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))通过 data_socket 发送消息:
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);if (rs != envelope_size){ printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %sn", zmq_strerror(errno)); zmq_msg_close(&envelope); break;}使用数据之前要先解除封装:
zmq_msg_close(&envelope);printf("Message sent; i: %u, topic: %sn", i, TOPIC);清理
C 语言不提供 垃圾收集 功能,用完之后记得要自己扫尾 。发送消息之后结束程序之前,需要运行扫尾代码,释放分配的内存:
const int rc = zmq_close(data_socket);if (rc != 0){ printf("ERROR: ZeroMQ error occurred during zmq_close(): %sn", zmq_strerror(errno)); return EXIT_FAILURE;}const int rd = zmq_ctx_destroy(context);if (rd != 0){ printf("Error occurred during zmq_ctx_destroy(): %sn", zmq_strerror(errno)); return EXIT_FAILURE;}return EXIT_SUCCESS;完整 C 代码
保存下面完整的接口代码到本地名为 hw_interface.c 的文件:
// For printf()#include <stdio.h>// For EXIT_*#include <stdlib.h>// For memcpy()#include <string.h>// For sleep()#include <unistd.h>#include <zmq.h>#include "libfancyhw.h"int main(void){ const unsigned int INIT_PARAM = 12345; const unsigned int REPETITIONS = 10; const unsigned int PACKET_SIZE = 16; const char *TOPIC = "fancyhw_data"; fancyhw_init(INIT_PARAM); void *context = zmq_ctx_new(); if (!context) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno)); return EXIT_FAILURE; } void *data_socket = zmq_socket(context, ZMQ_PUB); const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0) { printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno)); return EXIT_FAILURE; } const size_t topic_size = strlen(TOPIC); const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); printf("Topic: %s; topic size: %zu; Envelope size: %zun", TOPIC, topic_size, envelope_size); for (unsigned int i = 0; i < REPETITIONS; i++) { int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++) { buffer[j] = fancyhw_read_val(); } printf("Read %u data valuesn", PACKET_SIZE); zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size); if (rmi != 0) { printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %sn", zmq_strerror(errno)); zmq_msg_close(&envelope); break; } memcpy(zmq_msg_data(&envelope), TOPIC, topic_size); memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1); memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t)); const size_t rs = zmq_msg_send(&envelope, data_socket, 0); if (rs != envelope_size) { printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %sn", zmq_strerror(errno)); zmq_msg_close(&envelope); break; } zmq_msg_close(&envelope); printf("Message sent; i: %u, topic: %sn", i, TOPIC); sleep(1); } const int rc = zmq_close(data_socket); if (rc != 0) { printf("ERROR: ZeroMQ error occurred during zmq_close(): %sn", zmq_strerror(errno)); return EXIT_FAILURE; } const int rd = zmq_ctx_destroy(context); if (rd != 0) { printf("Error occurred during zmq_ctx_destroy(): %sn", zmq_strerror(errno)); return EXIT_FAILURE; } return EXIT_SUCCESS;}
推荐阅读
- 6个顶级可视化Python库
- 你平时是怎么管理 Docker 容器的?还在使用一大堆的窗口和命令吗
- |还在流通使用的5角纸币,单张价值22000元,你有吗?
- 龙利鱼|夏季能不能使用面霜?只要用对,效果就会翻倍
- 使用 PowerDNS 轻松配置 DNS 名称服务器
- 如何消息队列的数据积压问题
- 什么是Java可变参数列表?怎么和重载机制配合使用?
- 小米|小米Civi 1S用上骁龙778G Plus!美女产品经理:日常使用非常省心
- 喝茶黑色时段,宋代饮茶为什么流行使用黑色的建盏
- 第一颗使用望远镜发现的行星 一艘飞船飞向黑洞,远方观测者
