From f619ca8f6862c7d3d5b19020ee9408fb4f482dfb Mon Sep 17 00:00:00 2001 From: Nick Peng Date: Wed, 16 Nov 2022 00:20:39 +0800 Subject: [PATCH] ping: Optimize the ping callback thread to reduce inaccurate results caused by blocking --- src/fast_ping.c | 176 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 159 insertions(+), 17 deletions(-) diff --git a/src/fast_ping.c b/src/fast_ping.c index c154545..8e187ad 100644 --- a/src/fast_ping.c +++ b/src/fast_ping.c @@ -19,6 +19,7 @@ #include "fast_ping.h" #include "atomic.h" #include "hashtable.h" +#include "list.h" #include "tlog.h" #include "util.h" #include @@ -37,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +129,15 @@ struct ping_host_struct { struct fast_ping_packet packet; }; +struct fast_ping_notify_event { + struct list_head list; + struct ping_host_struct *ping_host; + FAST_PING_RESULT ping_result; + unsigned int seq; + int ttl; + struct timeval tvresult; +}; + struct fast_ping_struct { atomic_t run; pthread_t tid; @@ -145,6 +156,10 @@ struct fast_ping_struct { struct ping_host_struct udp6_host; int event_fd; + pthread_t notify_tid; + pthread_cond_t notify_cond; + pthread_mutex_t notify_lock; + struct list_head notify_event_list; pthread_mutex_t map_lock; DECLARE_HASHTABLE(addrmap, 6); @@ -154,6 +169,8 @@ static struct fast_ping_struct ping; static atomic_t ping_sid = ATOMIC_INIT(0); static int bool_print_log = 1; +static void _fast_ping_host_put(struct ping_host_struct *ping_host); + static void _fast_ping_wakup_thread(void) { uint64_t u = 1; @@ -378,6 +395,53 @@ static void _fast_ping_close_host_sock(struct ping_host_struct *ping_host) ping_host->fd = -1; } +static void _fast_ping_release_notify_event(struct fast_ping_notify_event *ping_notify_event) +{ + pthread_mutex_lock(&ping.notify_lock); + list_del_init(&ping_notify_event->list); + pthread_mutex_unlock(&ping.notify_lock); + + if (ping_notify_event->ping_host) { + _fast_ping_host_put(ping_notify_event->ping_host); + ping_notify_event->ping_host = NULL; + } + free(ping_notify_event); +} + +static int _fast_ping_send_notify_event(struct ping_host_struct *ping_host, FAST_PING_RESULT ping_result, + unsigned int seq, int ttl, struct timeval *tvresult) +{ + struct fast_ping_notify_event *notify_event = NULL; + + notify_event = malloc(sizeof(struct fast_ping_notify_event)); + if (notify_event == NULL) { + goto errout; + } + memset(notify_event, 0, sizeof(struct fast_ping_notify_event)); + INIT_LIST_HEAD(¬ify_event->list); + notify_event->seq = seq; + notify_event->ttl = ttl; + notify_event->ping_result = ping_result; + notify_event->tvresult = *tvresult; + + pthread_mutex_lock(&ping.notify_lock); + if (list_empty(&ping.notify_event_list)) { + pthread_cond_signal(&ping.notify_cond); + } + list_add_tail(¬ify_event->list, &ping.notify_event_list); + notify_event->ping_host = ping_host; + _fast_ping_host_get(ping_host); + pthread_mutex_unlock(&ping.notify_lock); + + return 0; + +errout: + if (notify_event) { + _fast_ping_release_notify_event(notify_event); + } + return -1; +} + static void _fast_ping_host_put(struct ping_host_struct *ping_host) { int ref_cnt = atomic_dec_and_test(&ping_host->ref); @@ -400,8 +464,7 @@ static void _fast_ping_host_put(struct ping_host_struct *ping_host) tv.tv_sec = 0; tv.tv_usec = 0; - ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len, - ping_host->seq, ping_host->ttl, &tv, ping_host->error, ping_host->userptr); + _fast_ping_send_notify_event(ping_host, PING_RESULT_END, ping_host->seq, ping_host->ttl, &tv); } tlog(TLOG_DEBUG, "ping %s end, id %d", ping_host->host, ping_host->sid); @@ -427,8 +490,7 @@ static void _fast_ping_host_remove(struct ping_host_struct *ping_host) tv.tv_sec = 0; tv.tv_usec = 0; - ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len, - ping_host->seq, ping_host->ttl, &tv, ping_host->error, ping_host->userptr); + _fast_ping_send_notify_event(ping_host, PING_RESULT_END, ping_host->seq, ping_host->ttl, &tv); } _fast_ping_host_put(ping_host); @@ -664,6 +726,8 @@ static int _fast_ping_sendping(struct ping_host_struct *ping_host) if (ret != 0) { ping_host->error = errno; return ret; + } else { + ping_host->error = 0; } return 0; @@ -1374,9 +1438,8 @@ static int _fast_ping_process_icmp(struct ping_host_struct *ping_host, struct ti recv_ping_host->ttl = packet->ttl; tv_sub(&tvresult, tvsend); if (recv_ping_host->ping_callback) { - recv_ping_host->ping_callback(recv_ping_host, recv_ping_host->host, PING_RESULT_RESPONSE, &recv_ping_host->addr, - recv_ping_host->addr_len, recv_ping_host->seq, recv_ping_host->ttl, &tvresult, - ping_host->error, recv_ping_host->userptr); + _fast_ping_send_notify_event(recv_ping_host, PING_RESULT_RESPONSE, recv_ping_host->seq, recv_ping_host->ttl, + &tvresult); } recv_ping_host->send = 0; @@ -1409,9 +1472,7 @@ static int _fast_ping_process_tcp(struct ping_host_struct *ping_host, struct epo } tv_sub(&tvresult, tvsend); if (ping_host->ping_callback) { - ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_RESPONSE, &ping_host->addr, - ping_host->addr_len, ping_host->seq, ping_host->ttl, &tvresult, ping_host->error, - ping_host->userptr); + _fast_ping_send_notify_event(ping_host, PING_RESULT_RESPONSE, ping_host->seq, ping_host->ttl, &tvresult); } ping_host->send = 0; @@ -1505,9 +1566,8 @@ static int _fast_ping_process_udp(struct ping_host_struct *ping_host, struct tim tvsend = &recv_ping_host->last; tv_sub(&tvresult, tvsend); if (recv_ping_host->ping_callback) { - recv_ping_host->ping_callback(recv_ping_host, recv_ping_host->host, PING_RESULT_RESPONSE, &recv_ping_host->addr, - recv_ping_host->addr_len, recv_ping_host->seq, recv_ping_host->ttl, &tvresult, - ping_host->error, recv_ping_host->userptr); + _fast_ping_send_notify_event(recv_ping_host, PING_RESULT_RESPONSE, recv_ping_host->seq, recv_ping_host->ttl, + &tvresult); } recv_ping_host->send = 0; @@ -1614,9 +1674,7 @@ static void _fast_ping_period_run(void) tv_sub(&interval, &ping_host->last); millisecond = interval.tv_sec * 1000 + interval.tv_usec / 1000; if (millisecond >= ping_host->timeout && ping_host->send == 1) { - ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_TIMEOUT, &ping_host->addr, - ping_host->addr_len, ping_host->seq, ping_host->ttl, &interval, ping_host->error, - ping_host->userptr); + _fast_ping_send_notify_event(ping_host, PING_RESULT_TIMEOUT, ping_host->seq, ping_host->ttl, &interval); ping_host->send = 0; } @@ -1642,6 +1700,56 @@ static void _fast_ping_period_run(void) } } +static void _fast_ping_process_notify_event(struct fast_ping_notify_event *ping_notify_event) +{ + struct ping_host_struct *ping_host = ping_notify_event->ping_host; + if (ping_host == NULL) { + return; + } + + ping_host->ping_callback(ping_host, ping_host->host, ping_notify_event->ping_result, &ping_host->addr, + ping_host->addr_len, ping_notify_event->seq, ping_notify_event->ttl, + &ping_notify_event->tvresult, ping_host->error, ping_host->userptr); +} + +static void *_fast_ping_notify_worker(void *arg) +{ + struct fast_ping_notify_event *ping_notify_event = NULL; + + while (atomic_read(&ping.run)) { + pthread_mutex_lock(&ping.notify_lock); + if (list_empty(&ping.notify_event_list)) { + pthread_cond_wait(&ping.notify_cond, &ping.notify_lock); + } + + ping_notify_event = list_first_entry_or_null(&ping.notify_event_list, struct fast_ping_notify_event, list); + if (ping_notify_event) { + list_del_init(&ping_notify_event->list); + } + pthread_mutex_unlock(&ping.notify_lock); + + if (ping_notify_event == NULL) { + continue; + } + + _fast_ping_process_notify_event(ping_notify_event); + _fast_ping_release_notify_event(ping_notify_event); + } + + return NULL; +} + +static void _fast_ping_remove_all_notify_event(void) +{ + struct fast_ping_notify_event *notify_event = NULL; + struct fast_ping_notify_event *tmp = NULL; + list_for_each_entry_safe(notify_event, tmp, &ping.notify_event_list, list) + { + _fast_ping_process_notify_event(notify_event); + _fast_ping_release_notify_event(notify_event); + } +} + static void *_fast_ping_work(void *arg) { struct epoll_event events[PING_MAX_EVENTS + 1]; @@ -1767,13 +1875,25 @@ int fast_ping_init(void) pthread_mutex_init(&ping.map_lock, NULL); pthread_mutex_init(&ping.lock, NULL); + pthread_mutex_init(&ping.notify_lock, NULL); + pthread_cond_init(&ping.notify_cond, NULL); + + INIT_LIST_HEAD(&ping.notify_event_list); + hash_init(ping.addrmap); ping.no_unprivileged_ping = !has_unprivileged_ping(); ping.ident = (getpid() & 0XFFFF); atomic_set(&ping.run, 1); + ret = pthread_create(&ping.tid, &attr, _fast_ping_work, NULL); if (ret != 0) { - tlog(TLOG_ERROR, "create ping work thread failed, %s\n", strerror(errno)); + tlog(TLOG_ERROR, "create ping work thread failed, %s\n", strerror(ret)); + goto errout; + } + + ret = pthread_create(&ping.notify_tid, &attr, _fast_ping_notify_worker, NULL); + if (ret != 0) { + tlog(TLOG_ERROR, "create ping notifyer work thread failed, %s\n", strerror(ret)); goto errout; } @@ -1786,9 +1906,18 @@ int fast_ping_init(void) return 0; errout: + if (ping.notify_tid) { + void *retval = NULL; + atomic_set(&ping.run, 0); + pthread_cond_signal(&ping.notify_cond); + pthread_join(ping.notify_tid, &retval); + ping.notify_tid = 0; + } + if (ping.tid) { void *retval = NULL; atomic_set(&ping.run, 0); + _fast_ping_wakup_thread(); pthread_join(ping.tid, &retval); ping.tid = 0; } @@ -1802,6 +1931,8 @@ errout: ping.event_fd = -1; } + pthread_cond_destroy(&ping.notify_cond); + pthread_mutex_destroy(&ping.notify_lock); pthread_mutex_destroy(&ping.lock); pthread_mutex_destroy(&ping.map_lock); memset(&ping, 0, sizeof(ping)); @@ -1834,6 +1965,14 @@ static void _fast_ping_close_fds(void) void fast_ping_exit(void) { + if (ping.notify_tid) { + void *retval = NULL; + atomic_set(&ping.run, 0); + pthread_cond_signal(&ping.notify_cond); + pthread_join(ping.notify_tid, &retval); + ping.notify_tid = 0; + } + if (ping.tid) { void *ret = NULL; atomic_set(&ping.run, 0); @@ -1849,7 +1988,10 @@ void fast_ping_exit(void) _fast_ping_close_fds(); _fast_ping_remove_all(); + _fast_ping_remove_all_notify_event(); + pthread_cond_destroy(&ping.notify_cond); + pthread_mutex_destroy(&ping.notify_lock); pthread_mutex_destroy(&ping.lock); pthread_mutex_destroy(&ping.map_lock); }