Make ping callback unlocked

This commit is contained in:
Nick Peng
2018-06-11 00:00:10 +08:00
parent ec83f75582
commit 8cd0edad4a
2 changed files with 755 additions and 725 deletions

View File

@@ -323,6 +323,7 @@ void _dns_server_ping_result(struct ping_host_struct *ping_host, const char *hos
if (result == PING_RESULT_END) { if (result == PING_RESULT_END) {
_dns_server_request_release(request); _dns_server_request_release(request);
fast_ping_stop(ping_host);
return; return;
} }

View File

@@ -60,8 +60,10 @@ struct fast_ping_packet {
struct ping_host_struct { struct ping_host_struct {
atomic_t ref; atomic_t ref;
atomic_t notified;
struct hlist_node host_node; struct hlist_node host_node;
struct hlist_node addr_node; struct hlist_node addr_node;
struct list_head action_list;
FAST_PING_TYPE type; FAST_PING_TYPE type;
void *userptr; void *userptr;
@@ -136,7 +138,7 @@ void _fast_ping_install_filter_v6(int sock)
BPF_STMT(BPF_RET | BPF_K, ~0U), /* No. It passes. This must not happen. */ BPF_STMT(BPF_RET | BPF_K, ~0U), /* No. It passes. This must not happen. */
BPF_STMT(BPF_RET | BPF_K, 0), /* Echo with wrong ident. Reject. */ BPF_STMT(BPF_RET | BPF_K, 0), /* Echo with wrong ident. Reject. */
}; };
static struct sock_fprog filter = { sizeof insns / sizeof(insns[0]), insns }; static struct sock_fprog filter = {sizeof insns / sizeof(insns[0]), insns};
if (once) { if (once) {
return; return;
@@ -165,7 +167,7 @@ void _fast_ping_install_filter_v4(int sock)
BPF_STMT(BPF_RET | BPF_K, 0) /* Echo with wrong ident. Reject. */ BPF_STMT(BPF_RET | BPF_K, 0) /* Echo with wrong ident. Reject. */
}; };
static struct sock_fprog filter = { sizeof insns / sizeof(insns[0]), insns }; static struct sock_fprog filter = {sizeof insns / sizeof(insns[0]), insns};
if (once) { if (once) {
return; return;
@@ -234,47 +236,49 @@ static void _fast_ping_host_get(struct ping_host_struct *ping_host)
atomic_inc(&ping_host->ref); atomic_inc(&ping_host->ref);
} }
static void _fast_ping_host_put_locked(struct ping_host_struct *ping_host, int locked) static void _fast_ping_host_put(struct ping_host_struct *ping_host)
{ {
if (locked) { if (!atomic_dec_and_test(&ping_host->ref)) {
if (atomic_dec_and_test(&ping_host->ref)) {
hash_del(&ping_host->host_node);
hash_del(&ping_host->addr_node);
} else {
ping_host = NULL;
}
} else {
pthread_mutex_lock(&ping.map_lock);
if (atomic_dec_and_test(&ping_host->ref)) {
hash_del(&ping_host->host_node);
hash_del(&ping_host->addr_node);
} else {
ping_host = NULL;
}
pthread_mutex_unlock(&ping.map_lock);
}
if (ping_host == NULL) {
return; return;
} }
pthread_mutex_lock(&ping.map_lock);
hash_del(&ping_host->host_node);
hash_del(&ping_host->addr_node);
pthread_mutex_unlock(&ping.map_lock);
if (atomic_inc_return(&ping_host->notified) == 1) {
struct timeval tv; struct timeval tv;
tv.tv_sec = 0; tv.tv_sec = 0;
tv.tv_usec = 0; tv.tv_usec = 0;
ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len, ping_host->seq, &tv, ping_host->userptr); ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len, ping_host->seq, &tv, ping_host->userptr);
}
if (ping_host->fd > 0) { if (ping_host->fd > 0) {
close(ping_host->fd); close(ping_host->fd);
ping_host->fd = -1; ping_host->fd = -1;
} }
tlog(TLOG_DEBUG, "ping %p end", ping_host);
free(ping_host); free(ping_host);
} }
static void _fast_ping_host_put(struct ping_host_struct *ping_host) static void _fast_ping_host_remove(struct ping_host_struct *ping_host)
{ {
_fast_ping_host_put_locked(ping_host, 0); pthread_mutex_lock(&ping.map_lock);
hash_del(&ping_host->addr_node);
pthread_mutex_unlock(&ping.map_lock);
if (atomic_inc_return(&ping_host->notified) == 1) {
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len, ping_host->seq, &tv, ping_host->userptr);
}
return _fast_ping_host_put(ping_host);
} }
static int _fast_ping_sendping_v6(struct ping_host_struct *ping_host) static int _fast_ping_sendping_v6(struct ping_host_struct *ping_host)
@@ -591,6 +595,7 @@ struct ping_host_struct *fast_ping_start(const char *host, int count, int interv
ping_host->type = type; ping_host->type = type;
ping_host->userptr = userptr; ping_host->userptr = userptr;
atomic_set(&ping_host->ref, 0); atomic_set(&ping_host->ref, 0);
atomic_set(&ping_host->notified, 0);
ping_host->sid = atomic_inc_return(&ping_sid); ping_host->sid = atomic_inc_return(&ping_sid);
if (ping_callback) { if (ping_callback) {
ping_host->ping_callback = ping_callback; ping_host->ping_callback = ping_callback;
@@ -607,9 +612,6 @@ struct ping_host_struct *fast_ping_start(const char *host, int count, int interv
memcpy(&ping_host->addr, gai->ai_addr, gai->ai_addrlen); memcpy(&ping_host->addr, gai->ai_addr, gai->ai_addrlen);
tlog(TLOG_DEBUG, "ping %s, id = %d", host, ping_host->sid); tlog(TLOG_DEBUG, "ping %s, id = %d", host, ping_host->sid);
if (_fast_ping_sendping(ping_host) != 0) {
goto errout;
}
hostkey = hash_string(ping_host->host); hostkey = hash_string(ping_host->host);
addrkey = jhash(&ping_host->addr, ping_host->addr_len, 0); addrkey = jhash(&ping_host->addr, ping_host->addr_len, 0);
@@ -617,12 +619,23 @@ struct ping_host_struct *fast_ping_start(const char *host, int count, int interv
pthread_mutex_lock(&ping.map_lock); pthread_mutex_lock(&ping.map_lock);
_fast_ping_host_get(ping_host); _fast_ping_host_get(ping_host);
hash_add(ping.hostmap, &ping_host->host_node, hostkey); hash_add(ping.hostmap, &ping_host->host_node, hostkey);
pthread_mutex_unlock(&ping.map_lock);
if (_fast_ping_sendping(ping_host) != 0) {
goto errout1;
}
freeaddrinfo(gai);
pthread_mutex_lock(&ping.map_lock);
_fast_ping_host_get(ping_host);
hash_add(ping.addrmap, &ping_host->addr_node, addrkey); hash_add(ping.addrmap, &ping_host->addr_node, addrkey);
pthread_mutex_unlock(&ping.map_lock); pthread_mutex_unlock(&ping.map_lock);
freeaddrinfo(gai);
return ping_host; return ping_host;
errout1:
pthread_mutex_lock(&ping.map_lock);
hash_del(&ping_host->host_node);
pthread_mutex_unlock(&ping.map_lock);
errout: errout:
if (gai) { if (gai) {
freeaddrinfo(gai); freeaddrinfo(gai);
@@ -795,7 +808,7 @@ static int _fast_ping_process_icmp(struct ping_host_struct *ping_host, struct ti
recv_ping_host->send = 0; recv_ping_host->send = 0;
if (recv_ping_host->count == 1) { if (recv_ping_host->count == 1) {
_fast_ping_host_put(recv_ping_host); _fast_ping_host_remove(recv_ping_host);
} }
return 0; return 0;
@@ -820,8 +833,8 @@ static int _fast_ping_process_tcp(struct ping_host_struct *ping_host, struct epo
} }
tv_sub(&tvresult, tvsend); tv_sub(&tvresult, tvsend);
if (ping_host->ping_callback) { if (ping_host->ping_callback) {
ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_RESPONSE, &ping_host->addr, ping_host->addr_len, ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_RESPONSE, &ping_host->addr, ping_host->addr_len, ping_host->seq, &tvresult,
ping_host->seq, &tvresult, ping_host->userptr); ping_host->userptr);
} }
ping_host->send = 0; ping_host->send = 0;
@@ -832,7 +845,7 @@ static int _fast_ping_process_tcp(struct ping_host_struct *ping_host, struct epo
} }
if (ping_host->count == 1) { if (ping_host->count == 1) {
_fast_ping_host_put(ping_host); _fast_ping_host_remove(ping_host);
} }
return 0; return 0;
errout: errout:
@@ -871,9 +884,28 @@ static void _fast_ping_period_run()
struct timeval interval; struct timeval interval;
int64_t millisecond; int64_t millisecond;
gettimeofday(&now, 0); gettimeofday(&now, 0);
LIST_HEAD(action);
pthread_mutex_lock(&ping.map_lock); pthread_mutex_lock(&ping.map_lock);
hash_for_each_safe(ping.addrmap, i, tmp, ping_host, addr_node) hash_for_each_safe(ping.addrmap, i, tmp, ping_host, addr_node)
{
interval = now;
tv_sub(&interval, &ping_host->last);
millisecond = interval.tv_sec * 1000 + interval.tv_usec / 1000;
if (millisecond >= ping_host->timeout && ping_host->send == 1) {
list_add_tail(&ping_host->action_list, &action);
continue;
}
if (millisecond < ping_host->interval) {
continue;
}
list_add_tail(&ping_host->action_list, &action);
}
pthread_mutex_unlock(&ping.map_lock);
list_for_each_entry(ping_host, &action, action_list)
{ {
interval = now; interval = now;
tv_sub(&interval, &ping_host->last); tv_sub(&interval, &ping_host->last);
@@ -890,9 +922,7 @@ static void _fast_ping_period_run()
if (ping_host->count > 0) { if (ping_host->count > 0) {
if (ping_host->count == 1) { if (ping_host->count == 1) {
hash_del(&ping_host->host_node); _fast_ping_host_remove(ping_host);
hash_del(&ping_host->addr_node);
_fast_ping_host_put_locked(ping_host, 1);
continue; continue;
} }
ping_host->count--; ping_host->count--;
@@ -900,7 +930,6 @@ static void _fast_ping_period_run()
_fast_ping_sendping(ping_host); _fast_ping_sendping(ping_host);
} }
pthread_mutex_unlock(&ping.map_lock);
} }
static void *_fast_ping_work(void *arg) static void *_fast_ping_work(void *arg)
@@ -908,9 +937,9 @@ static void *_fast_ping_work(void *arg)
struct epoll_event events[PING_MAX_EVENTS + 1]; struct epoll_event events[PING_MAX_EVENTS + 1];
int num; int num;
int i; int i;
struct timeval last = { 0 }; struct timeval last = {0};
struct timeval now = { 0 }; struct timeval now = {0};
struct timeval diff = { 0 }; struct timeval diff = {0};
uint millisec = 0; uint millisec = 0;
while (ping.run) { while (ping.run) {