#include <stdio.h>#include <pthread.h>#include <stdio.h>#include <unistd.h>#include <netinet/in.h>#include <sys/socket.h>#include <sys/wait.h>#include <sys/types.h>#include <string.h>#include <errno.h>#include <arpa/inet.h>#include <time.h>#include <sys/time.h>#include <signal.h>#define RESV_BUFF_SIZE 512#define SEND_BUFF_SIZE 1024/** * redis服务端的IP地址 */#define DEST_ADDR "172.20.23.83"/** * redis服务端的端口号 */#define DEST_PORT 6379/** * 每个线程总共执行插入动作的次数 */#define TRANS_PER_THREAD 10000/** * 每隔多少次插入进行一次日志打印 */#define LOG_STEP 255/** * 总共开启线程数量 */#define THREADS_NUM 100int numLen(int num) { int i = 0; do { num /= 10; i++; } while (num != 0); return i;}/** * 线程执行代码 * @param p 从0-99的线程编号,用于确定以后线程执行插入的数据段 * 比方编号为1,则插入TRANS_PER_THREAD*1 至 TRANS_PER_THREAD*1 + TRANS_PER_THREAD 区间的数据 */void threadMain(void *p){ int sockfd,*index=p,rlen; *index *= TRANS_PER_THREAD; int end = *index + TRANS_PER_THREAD; struct sockaddr_in dest_addr; bzero(&(dest_addr), sizeof(dest_addr)); char resvbuf[RESV_BUFF_SIZE]; char sendbuf[SEND_BUFF_SIZE]; sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { printf("socket create failed:%d!\n",sockfd); } dest_addr.sin_family = AF_INET; dest_addr.sin_port = htons(DEST_PORT); inet_pton(AF_INET,DEST_ADDR,&dest_addr.sin_addr); if (connect(sockfd,(struct sockaddr*)&dest_addr, sizeof(struct sockaddr)) == -1) { printf("connect failed:%d!\n",errno); perror("error: "); } else { printf("connect success!\n"); struct timeval tv,ltv; for (int i = *index; i < end; ++i) { rlen = numLen(i); memset(sendbuf,0,SEND_BUFF_SIZE); /** * 结构当次申请发送的数据 * 需满足RESP协定标准 */ sprintf(sendbuf,"*4\r\n$4\r\nHSET\r\n$6\r\nmigkey\r\n$%d\r\nmest%d\r\n$%d\r\nmest67890%d\r\n",rlen+4,i,rlen+9,i); write(sockfd,sendbuf,strlen(sendbuf)); /** * 尽管咱们并不关怀服务端的返回 * 此处仍然进行了读取操作,防止缓冲堆满 */ read(sockfd,resvbuf,RESV_BUFF_SIZE); /** * 每执行LOG_STEP+1次就打印一条日志信息 * LOG_STEP需满足(2^n - 1) */ if ((i & LOG_STEP) == 0) { gettimeofday(&tv, NULL); if (i > *index) { printf("used %ld.%d seconds.\n",tv.tv_sec - ltv.tv_sec,tv.tv_usec - ltv.tv_usec); fflush(stdout); } ltv = tv; } } } printf("finished index:%d.\n",*index); close(sockfd);}void handle_pipe(int sig) {// printf("sig %d ignore.\n",sig);}int main() { /** * 因为tcp的client敞开后,服务端依然有可能向咱们发送数据 * 这样会造成咱们的过程收到SIGPIPE信号 * 所以此处注册信号处理函数 */ struct sigaction sa; sa.sa_handler = handle_pipe; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGPIPE,&sa,NULL); pthread_t pts[THREADS_NUM]; int indexs[THREADS_NUM]; struct timeval tv,ltv; gettimeofday(<v, NULL); /** * 创立线程 */ for (int i = 0; i < THREADS_NUM; ++i) { indexs[i] = i; if (pthread_create(&pts[i], NULL, threadMain, &indexs[i]) != 0) { printf("create thread error!\n"); } else { printf("thread %d created!\n",i); } } /** * 期待线程 */ for (int i = 0; i < THREADS_NUM; ++i) { pthread_join(pts[i], NULL); } gettimeofday(&tv, NULL); printf("\n------All finished!------\n""used %ld.%d seconds.\n",tv.tv_sec - ltv.tv_sec,tv.tv_usec - ltv.tv_usec); return 0;}