#include <stdio.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
#include <signal.h>
#define RESV_BUFF_SIZE 512
#define SEND_BUFF_SIZE 1024
/**
* redis 服务端的 IP 地址
*/
#define DEST_ADDR "172.20.23.83"
/**
* redis 服务端的端口号
*/
#define DEST_PORT 6379
/**
* 每个线程总共执行插入动作的次数
*/
#define TRANS_PER_THREAD 10000
/**
* 每隔多少次插入进行一次日志打印
*/
#define LOG_STEP 255
/**
* 总共开启线程数量
*/
#define THREADS_NUM 100
int numLen(int num) {
int i = 0;
do {
num /= 10;
i++;
} while (num != 0);
return i;
}
/**
* 线程执行代码
* @param p 从 0 -99 的线程编号,用于确定以后线程执行插入的数据段
* 比方编号为 1,则插入 TRANS_PER_THREAD*1 至 TRANS_PER_THREAD*1 + TRANS_PER_THREAD 区间的数据
*/
void threadMain(void *p)
{
int sockfd,*index=p,rlen;
*index *= TRANS_PER_THREAD;
int end = *index + TRANS_PER_THREAD;
struct sockaddr_in dest_addr;
bzero(&(dest_addr), sizeof(dest_addr));
char resvbuf[RESV_BUFF_SIZE];
char sendbuf[SEND_BUFF_SIZE];
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {printf("socket create failed:%d!\n",sockfd);
}
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(DEST_PORT);
inet_pton(AF_INET,DEST_ADDR,&dest_addr.sin_addr);
if (connect(sockfd,(struct sockaddr*)&dest_addr, sizeof(struct sockaddr)) == -1) {printf("connect failed:%d!\n",errno);
perror("error:");
} else {printf("connect success!\n");
struct timeval tv,ltv;
for (int i = *index; i < end; ++i) {rlen = numLen(i);
memset(sendbuf,0,SEND_BUFF_SIZE);
/**
* 结构当次申请发送的数据
* 需满足 RESP 协定标准
*/
sprintf(sendbuf,"*4\r\n$4\r\nHSET\r\n$6\r\nmigkey\r\n$%d\r\nmest%d\r\n$%d\r\nmest67890%d\r\n",rlen+4,i,rlen+9,i);
write(sockfd,sendbuf,strlen(sendbuf));
/**
* 尽管咱们并不关怀服务端的返回
* 此处仍然进行了读取操作,防止缓冲堆满
*/
read(sockfd,resvbuf,RESV_BUFF_SIZE);
/**
* 每执行 LOG_STEP+ 1 次就打印一条日志信息
* LOG_STEP 需满足(2^n - 1)*/
if ((i & LOG_STEP) == 0) {gettimeofday(&tv, NULL);
if (i > *index) {printf("used %ld.%d seconds.\n",tv.tv_sec - ltv.tv_sec,tv.tv_usec - ltv.tv_usec);
fflush(stdout);
}
ltv = tv;
}
}
}
printf("finished index:%d.\n",*index);
close(sockfd);
}
void handle_pipe(int sig) {// printf("sig %d ignore.\n",sig);
}
int main() {
/**
* 因为 tcp 的 client 敞开后,服务端依然有可能向咱们发送数据
* 这样会造成咱们的过程收到 SIGPIPE 信号
* 所以此处注册信号处理函数
*/
struct sigaction sa;
sa.sa_handler = handle_pipe;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGPIPE,&sa,NULL);
pthread_t pts[THREADS_NUM];
int indexs[THREADS_NUM];
struct timeval tv,ltv;
gettimeofday(<v, NULL);
/**
* 创立线程
*/
for (int i = 0; i < THREADS_NUM; ++i) {indexs[i] = i;
if (pthread_create(&pts[i], NULL, threadMain, &indexs[i]) != 0) {printf("create thread error!\n");
} else {printf("thread %d created!\n",i);
}
}
/**
* 期待线程
*/
for (int i = 0; i < THREADS_NUM; ++i) {pthread_join(pts[i], NULL);
}
gettimeofday(&tv, NULL);
printf("\n------All finished!------\n""used %ld.%d seconds.\n",tv.tv_sec - ltv.tv_sec,tv.tv_usec - ltv.tv_usec);
return 0;
}