使用 ZeroMQ 消息库在 C 和 Python 间共享数据( 二 )

下一步, 计算一些后续要用到的值 。注意下面代码中的 TOPIC,因为 PUB 套接字发送的消息需要绑定一个主题 。主题用于供接收者过滤消息:
const size_t topic_size = strlen(TOPIC);const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);printf("Topic: %s; topic size: %zu; Envelope size: %zun", TOPIC, topic_size, envelope_size);发送消息
启动一个发送消息的循环,循环 REPETITIONS 次:
for (unsigned int i = 0; i < REPETITIONS; i++){    ...发送消息前,先填充一个长度为 PACKET_SIZE 的缓冲区 。本库提供的是 16 个位的有符号整数 。因为 C 语言中 int 类型占用空间大小与平台相关,不是确定的值,所以要使用指定宽度的 int 变量:
int16_t buffer[PACKET_SIZE];for (unsigned int j = 0; j < PACKET_SIZE; j++){    buffer[j] = fancyhw_read_val();}printf("Read %u data valuesn", PACKET_SIZE);消息的准备和发送的第一步是创建 ZeroMQ 消息,为消息分配必要的内存空间 。空白的消息是用于封装要发送的数据的:
zmq_msg_t envelope;const int rmi = zmq_msg_init_size(&envelope, envelope_size);if (rmi != 0){    printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %sn", zmq_strerror(errno));    zmq_msg_close(&envelope);    break;}现在内存空间已分配,数据保存在 ZeroMQ 消息 “信封”中 。函数 zmq_msg_data() 返回一个指向封装数据缓存区顶端的指针 。第一部分是主题,之后是一个空格,最后是二进制数 。主题和二进制数据之间的分隔符采用空格字符 。需要遍历缓存区的话,使用类型转换和 指针算法。(感谢 C 语言,让事情变得直截了当 。)做法如下:
【使用 ZeroMQ 消息库在 C 和 Python 间共享数据】memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))通过 data_socket 发送消息:
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);if (rs != envelope_size){    printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %sn", zmq_strerror(errno));    zmq_msg_close(&envelope);    break;}使用数据之前要先解除封装:
zmq_msg_close(&envelope);printf("Message sent; i: %u, topic: %sn", i, TOPIC);清理
C 语言不提供 垃圾收集 功能,用完之后记得要自己扫尾 。发送消息之后结束程序之前,需要运行扫尾代码,释放分配的内存:
const int rc = zmq_close(data_socket);if (rc != 0){    printf("ERROR: ZeroMQ error occurred during zmq_close(): %sn", zmq_strerror(errno));    return EXIT_FAILURE;}const int rd = zmq_ctx_destroy(context);if (rd != 0){    printf("Error occurred during zmq_ctx_destroy(): %sn", zmq_strerror(errno));    return EXIT_FAILURE;}return EXIT_SUCCESS;完整 C 代码
保存下面完整的接口代码到本地名为 hw_interface.c 的文件:
// For printf()#include <stdio.h>// For EXIT_*#include <stdlib.h>// For memcpy()#include <string.h>// For sleep()#include <unistd.h>#include <zmq.h>#include "libfancyhw.h"int main(void){    const unsigned int INIT_PARAM = 12345;    const unsigned int REPETITIONS = 10;    const unsigned int PACKET_SIZE = 16;    const char *TOPIC = "fancyhw_data";    fancyhw_init(INIT_PARAM);    void *context = zmq_ctx_new();    if (!context)    {        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno));        return EXIT_FAILURE;    }    void *data_socket = zmq_socket(context, ZMQ_PUB);    const int rb = zmq_bind(data_socket, "tcp://*:5555");    if (rb != 0)    {        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %sn", zmq_strerror(errno));        return EXIT_FAILURE;    }    const size_t topic_size = strlen(TOPIC);    const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);    printf("Topic: %s; topic size: %zu; Envelope size: %zun", TOPIC, topic_size, envelope_size);    for (unsigned int i = 0; i < REPETITIONS; i++)    {        int16_t buffer[PACKET_SIZE];        for (unsigned int j = 0; j < PACKET_SIZE; j++)        {            buffer[j] = fancyhw_read_val();        }        printf("Read %u data valuesn", PACKET_SIZE);        zmq_msg_t envelope;           const int rmi = zmq_msg_init_size(&envelope, envelope_size);        if (rmi != 0)        {            printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %sn", zmq_strerror(errno));               zmq_msg_close(&envelope);               break;        }               memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);        memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);        memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));           const size_t rs = zmq_msg_send(&envelope, data_socket, 0);        if (rs != envelope_size)        {            printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %sn", zmq_strerror(errno));               zmq_msg_close(&envelope);               break;        }           zmq_msg_close(&envelope);        printf("Message sent; i: %u, topic: %sn", i, TOPIC);        sleep(1);    }    const int rc = zmq_close(data_socket);    if (rc != 0)    {        printf("ERROR: ZeroMQ error occurred during zmq_close(): %sn", zmq_strerror(errno));        return EXIT_FAILURE;    }    const int rd = zmq_ctx_destroy(context);    if (rd != 0)    {        printf("Error occurred during zmq_ctx_destroy(): %sn", zmq_strerror(errno));        return EXIT_FAILURE;    }    return EXIT_SUCCESS;}


推荐阅读