原文来自:/uid-24830931-id-3786670.html
一、现象描述
在利用librdkafka同kafka broker通信过程中,当kafka broker意外退出时(如kill -9),librdkafka接口的sendmsg接口报出了“Program received signal SIGPIPE, Broken pipe.” 这个错误具有典型性,根据网络搜索的结果,这个一般是由于向一个被破坏的socket连接或者pipe读写数据造成的,向有经验的同事请教,他们说这种场景不会出现SIGPIPE信号,而是直接send, write, sendmsg等返回-1,同时errno会被设置成EPIPE。
实践是检验真理的唯一标准,找个例子一试便知。
二、例子程序
为了快速检验,从网上上借了一个简单的客户端、服务器程序,/dlpucat/item/97ab75c5243b8761f6c95d75,多谢原作者。服务器端程序 server.c
点击(此处)折叠或打开
#include<netinet/in.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#define HELLO_WORLD_SERVER_PORT 6666
#define LENGTH_OF_LISTEN_QUEUE 20
#define BUFFER_SIZE 1024
intmain(intargc,char**argv)
{
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htons(INADDR_ANY);
server_addr.sin_port=htons(HELLO_WORLD_SERVER_PORT);
intserver_socket=socket(AF_INET,SOCK_STREAM,0);
if(server_socket<0)
{
printf("Create Socket Failed!");
exit(1);
}
if(bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr)))
{
printf("Server Bind Port : %d Failed!",HELLO_WORLD_SERVER_PORT);
exit(1);
}
if(listen(server_socket,LENGTH_OF_LISTEN_QUEUE))
{
printf("Server Listen Failed!");
exit(1);
}
while(1)
{
struct sockaddr_in client_addr;
socklen_t length=sizeof(client_addr);
intnew_server_socket=accept(server_socket,(struct sockaddr*)&client_addr,&length);
if(new_server_socket<0)
{
printf("Server Accept Failed!\n");
break;
}
char buffer[BUFFER_SIZE];
bzero(buffer,BUFFER_SIZE);
strcpy(buffer,"Hello,World from server!");
strcat(buffer,"\n");
send(new_server_socket,buffer,BUFFER_SIZE,0);
bzero(buffer,BUFFER_SIZE);
while(1){
length=recv(new_server_socket,buffer,BUFFER_SIZE,0);
if(length<0)
{
printf("Server Recieve Data Failed!\n");
exit(1);
}
printf("\n%s",buffer);
}
close(new_server_socket);
}
close(server_socket);
return 0;
}
客户端程序
点击(此处)折叠或打开
#include<netinet/in.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<signal.h>
#include<errno.h>
#define HELLO_WORLD_SERVER_PORT 6666
#define BUFFER_SIZE 1024
intmain(intargc,char**argv)
{
if(argc!=2)
{
printf("Usage: ./%s ServerIPAddress\n",argv[0]);
exit(1);
}
struct sockaddr_in client_addr;
bzero(&client_addr,sizeof(client_addr));
client_addr.sin_family=AF_INET;
client_addr.sin_addr.s_addr=htons(INADDR_ANY);
client_addr.sin_port=htons(0);
intclient_socket=socket(AF_INET,SOCK_STREAM,0);
if(client_socket<0)
{
printf("Create Socket Failed!\n");
exit(1);
}
if(bind(client_socket,(struct sockaddr*)&client_addr,sizeof(client_addr)))
{
printf("Client Bind Port Failed!\n");
exit(1);
}
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
if(inet_aton(argv[1],&server_addr.sin_addr)==0)
{
printf("Server IP Address Error!\n");
exit(1);
}
server_addr.sin_port=htons(HELLO_WORLD_SERVER_PORT);
socklen_t server_addr_length=sizeof(server_addr);
if(connect(client_socket,(struct sockaddr*)&server_addr,server_addr_length)<0)
{
printf("Can Not Connect To %s!\n",argv[1]);
exit(1);
}
char buffer[BUFFER_SIZE];
bzero(buffer,BUFFER_SIZE);
intlength=recv(client_socket,buffer,BUFFER_SIZE,0);
if(length<0)
{
printf("Recieve Data From Server %s Failed!\n",argv[1]);
exit(1);
}
printf("From Server %s :\t%s",argv[1],buffer);
bzero(buffer,BUFFER_SIZE);
strcpy(buffer,"Hello, World! From Client\n");
while(1){
sleep(1);
intret=send(client_socket,buffer,BUFFER_SIZE,0);
if(ret==-1&&errno==EPIPE){
printf("receive sigpipe\n");
}
}
close(client_socket);
return 0;
}
三、重现方法
step 1)编译: gcc -o server server.cgcc -o -gclient client.c (通过gdb直接看到异常退出)
step 2)启动服务器端:./server
step 3) 启动客户端:(这里假设客户端和服务器部署在同一台服务器)gdb ./client
(gdb) r 127.0.0.1
step 4) 观察正常运行结果:首先是客户端收到服务器端的消息:From Server 127.0.0.1 : Hello,World from server!
然后是服务器端每隔1s收到客户端的消息:Hello, World! From Client
step 5)通过ctrl+c关闭服务器端
step 6)观察客户端结果
Program received signal SIGPIPE, Broken pipe.
0x0000003a7fcd55f5 in send () from /lib64/libc.so.6
重现了!!
四、解决办法
解决办法很多,也很简单。4.1 client中忽略SIGPIPE信号
点击(此处)折叠或打开
signal(SIGPIPE,SIG_IGN);
4.2 阻止SIGPIPE信号(后来追查,原来同事的程序框架中已经有了这种机制,所以没有经历过程序退出的问题)
点击(此处)折叠或打开
sigset_tset;
sigemptyset(&set);
sigaddset(&set,SIGPIPE);
sigprocmask(SIG_BLOCK,&set,NULL);
4.3 为SIGPIPE添加信号处理函数,处理完程序继续执行
点击(此处)折叠或打开
signal(SIGPIPE,pipesig_handler);
多种选择,总有一款适合您。
经验证测试,第2种方法可以屏蔽Broken pipe,然后通过客户端发送字节长度为-1,从而做处理