一、现象描述


在利用librdkafka同kafka broker通信过程中,当kafka broker意外退出时(如kill -9),librdkafka接口的sendmsg接口报出了“Program received signal SIGPIPE, Broken pipe.” 这个错误具有典型性,根据网络搜索的结果,这个一般是由于向一个被破坏的socket连接或者pipe读写数据造成的,向有经验的同事请教,他们说这种场景不会出现SIGPIPE信号,而是直接send, write, sendmsg等返回-1,同时errno会被设置成EPIPE。

实践是检验真理的唯一标准,找个例子一试便知。

二、例子程序

为了快速检验,从网上上借了一个简单的客户端、服务器程序, http://hi.baidu.com/dlpucat/item/97ab75c5243b8761f6c95d75,多谢原作者。

服务器端程序 server.c

点击(此处)折叠或打开

  1. #include <netinet/in.h>
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>

  7. #define HELLO_WORLD_SERVER_PORT 6666
  8. #define LENGTH_OF_LISTEN_QUEUE 20
  9. #define BUFFER_SIZE 1024

  10. int main(int argc, char **argv)
  11. {
  12.   struct sockaddr_in server_addr;
  13.   bzero(&server_addr,sizeof(server_addr));
  14.   server_addr.sin_family = AF_INET;
  15.   server_addr.sin_addr.s_addr = htons(INADDR_ANY);
  16.   server_addr.sin_port = htons(HELLO_WORLD_SERVER_PORT);

  17.   int server_socket = socket(AF_INET,SOCK_STREAM,0);
  18.   if( server_socket < 0)
  19.   {
  20.     printf("Create Socket Failed!");
  21.     exit(1);
  22.   }

  23.   if( bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr)))
  24.   {
  25.     printf("Server Bind Port : %d Failed!", HELLO_WORLD_SERVER_PORT);
  26.     exit(1);
  27.   }

  28.   if ( listen(server_socket, LENGTH_OF_LISTEN_QUEUE) )
  29.   {
  30.     printf("Server Listen Failed!");
  31.     exit(1);
  32.   }

  33.   while (1)
  34.   {
  35.     struct sockaddr_in client_addr;
  36.     socklen_t length = sizeof(client_addr);

  37.     int new_server_socket = accept(server_socket,(struct sockaddr*)&client_addr,&length);
  38.     if ( new_server_socket < 0)
  39.     {
  40.       printf("Server Accept Failed!\n");
  41.       break;
  42.     }

  43.     char buffer[BUFFER_SIZE];
  44.     bzero(buffer, BUFFER_SIZE);
  45.     strcpy(buffer,"Hello,World from server!");
  46.     strcat(buffer,"\n");
  47.     send(new_server_socket,buffer,BUFFER_SIZE,0);

  48.     bzero(buffer,BUFFER_SIZE);
  49.         while(1){
  50.       length = recv(new_server_socket,buffer,BUFFER_SIZE,0);
  51.       if (length < 0)
  52.       {
  53.         printf("Server Recieve Data Failed!\n");
  54.         exit(1);
  55.       }
  56.       printf("\n%s",buffer);
  57.         }
  58.     close(new_server_socket);
  59.   }
  60.   close(server_socket);
  61.   return 0;
  62. }

客户端程序

点击(此处)折叠或打开

  1. #include <netinet/in.h>
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <signal.h>
  8. #include <errno.h>

  9. #define HELLO_WORLD_SERVER_PORT 6666
  10. #define BUFFER_SIZE 1024

  11. int main(int argc, char **argv)
  12. {
  13.   if (argc != 2)
  14.   {
  15.     printf("Usage: ./%s ServerIPAddress\n",argv[0]);
  16.     exit(1);
  17.   }

  18.   struct sockaddr_in client_addr;
  19.   bzero(&client_addr,sizeof(client_addr));
  20.   client_addr.sin_family = AF_INET;
  21.   client_addr.sin_addr.s_addr = htons(INADDR_ANY);
  22.   client_addr.sin_port = htons(0);

  23.   int client_socket = socket(AF_INET,SOCK_STREAM,0);

  24.   if( client_socket < 0)
  25.   {
  26.     printf("Create Socket Failed!\n");
  27.     exit(1);
  28.   }

  29.   if( bind(client_socket,(struct sockaddr*)&client_addr,sizeof(client_addr)))
  30.   {
  31.     printf("Client Bind Port Failed!\n");
  32.     exit(1);
  33.   }

  34.   struct sockaddr_in server_addr;
  35.   bzero(&server_addr,sizeof(server_addr));
  36.   server_addr.sin_family = AF_INET;
  37.   if(inet_aton(argv[1],&server_addr.sin_addr) == 0)
  38.   {
  39.     printf("Server IP Address Error!\n");
  40.     exit(1);
  41.   }
  42.   server_addr.sin_port = htons(HELLO_WORLD_SERVER_PORT);
  43.   socklen_t server_addr_length = sizeof(server_addr);
  44.   if(connect(client_socket,(struct sockaddr*)&server_addr, server_addr_length) < 0)
  45.   {
  46.     printf("Can Not Connect To %s!\n",argv[1]);
  47.     exit(1);
  48.   }

  49.   char buffer[BUFFER_SIZE];
  50.   bzero(buffer,BUFFER_SIZE);
  51.   int length = recv(client_socket,buffer,BUFFER_SIZE,0);
  52.   if(length < 0)
  53.   {
  54.     printf("Recieve Data From Server %s Failed!\n", argv[1]);
  55.     exit(1);
  56.   }
  57.   printf("From Server %s :\t%s",argv[1],buffer);

  58.   bzero(buffer,BUFFER_SIZE);
  59.   strcpy(buffer,"Hello, World! From Client\n");

  60.   while(1){
  61.     sleep(1);
  62.     int ret = send(client_socket,buffer,BUFFER_SIZE,0);
  63.         if (ret == -1 && errno == EPIPE){
  64.       printf("receive sigpipe\n");
  65.     }
  66.   }

  67.   close(client_socket);
  68.   return 0;
  69. }

三、重现方法

step 1)编译: gcc -o server server.c
          gcc -o -g client client.c (通过gdb直接看到异常退出)

step 2)启动服务器端:./server

step 3) 启动客户端: (这里假设客户端和服务器部署在同一台服务器 gdb ./client
(gdb) r 127.0.0.1

step 4) 观察正常运行结果:首先是客户端收到服务器端的消息:From Server 127.0.0.1 : Hello,World from server!
         然后是服务器端每隔1s收到客户端的消息: Hello, World! From Client

step 5)通过ctrl+c关闭服务器端

step 6)观察客户端结果
Program received signal SIGPIPE, Broken pipe.
0x0000003a7fcd55f5 in send () from /lib64/libc.so.6

重现了!!

四、解决办法

解决办法很多,也很简单。

4.1 client中忽略SIGPIPE信号

点击(此处)折叠或打开

  1. signal(SIGPIPE, SIG_IGN);

4.2 阻止SIGPIPE信号(后来追查,原来同事的程序框架中已经有了这种机制,所以没有经历过程序退出的问题)

点击(此处)折叠或打开

  1. sigset_t set;
  2. sigemptyset(&set);
  3. sigaddset(&set, SIGPIPE);
  4. sigprocmask(SIG_BLOCK, &set, NULL);

4.3  为SIGPIPE添加信号处理函数,处理完程序继续执行

点击(此处)折叠或打开

  1. signal(SIGPIPE, pipesig_handler);

多种选择,总有一款适合您。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐