之前这篇文章dispatch_semaphore搬运源码搬了dispatch_semaphore相关的方法,后来又看了看group相关的,semaphore和group的源码大多都在同一个文件,所以处理逻辑也有很多相似的地方。
dispatch_group_t的结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| struct dispatch_group_s { OS_OBJECT_STRUCT_HEADER(dispatch_group); struct dispatch_group_s *volatile do_next; struct dispatch_queue_s *do_targetq; void *do_ctxt; void *do_finalizer; long volatile dg_value; struct dispatch_continuation_s *volatile dg_notify_head; struct dispatch_continuation_s *volatile dg_notify_tail; sem_t dg_sem; };
|
关键的有三个变量,dg_value、dg_notify_head、dg_notify_tail;
- dg_value 是当前正在执行的任务数量
- dg_notify_head 是排队等待notify的队首
- dg_notify_tail 是排队等待notify的队尾
dispatch_group_create
1 2 3 4 5 6 7 8 9 10 11 12
| dispatch_group_t dispatch_group_create(void) { return _dispatch_group_create_with_count(0); dispatch_group_t dg = (dispatch_group_t)_dispatch_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s)); dg->do_next = DISPATCH_OBJECT_LISTLESS; dg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, false); dg->dsema_value = value; int ret = sem_init(&dg->dsema_sem, 0, 0); DISPATCH_SEMAPHORE_VERIFY_RET(ret); return dg; }
|
主要是给 dsema_value 赋值。
dispatch_group_async
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT | DISPATCH_OBJ_BLOCK_BIT; dc->dc_ctxt = _dispatch_Block_copy(db); dc->dc_func = _dispatch_call_block_and_release; dispatch_group_enter(dg); dc->dc_data = dg; dq = dq->do_targetq; while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) { if (!_dispatch_queue_try_acquire_async(dq)) break; dq = dq->do_targetq; } if (_dispatch_queue_push_update_tail(dq, dc)) { _dispatch_queue_push_update_head(dq, tail, true); dispatch_wakeup_flags_t flags = DISPATCH_WAKEUP_CONSUME | DISPATCH_WAKEUP_FLUSH; (&dq->do_vtable->_os_obj_vtable)->dx_wakeup(dq, pp, 0) } }
|
就是创建一个任务,调用 dispatch_group_enter(dg) 在 group 中标记有一个正在执行的任务,然后将 group 赋值到任务的 dc->dc_data 中。最后的执行和 dispatch_async 一样。
不同的是,我们可以看到在 dispatch_group_async 中被创建的任务添加了 DISPATCH_OBJ_GROUP_BIT 的标记。有了这个标记后,任务在 dispatch_async 执行时的不同如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov, dispatch_invoke_flags_t flags) { dispatch_continuation_t dc = dou._dc, dc1; dispatch_invoke_with_autoreleasepool(flags, { dc1 = _dispatch_continuation_free_cacheonly(dc); if (dc->dc_flags & DISPATCH_OBJ_GROUP_BIT) { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_introspection_queue_item_complete(dc->dc_data); dispatch_group_leave((dispatch_group_t)dc->dc_data); } else { } _dispatch_continuation_free_to_cache_limit(dc1); }); }
|
可以发现最大的不同在 if (dc->dc_flags & DISPATCH_OBJ_GROUP_BIT) 条件中,在执行完 async 任务后,会调用一下 dispatch_group_leave 在 group 中标记有一个任务完成了。
dispatch_group_notify
1 2 3 4 5 6 7 8 9
| void dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dsn = _dispatch_continuation_alloc(); dsn->dc_flags = DISPATCH_OBJ_CONSUME_BIT; dsn->dc_ctxt = _dispatch_Block_copy(db); _dispatch_group_notify(dg, dq, dsn); }
|
在等待列表中添加一个需要被通知的任务,更新一下 dg_notify_tail 队尾,同时如果有必要,还会再更新一下队首 dg_notify_head。
dispatch_group_enter & dispatch_group_leave
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| void dispatch_group_enter(dispatch_group_t dg) { long value = os_atomic_inc_orig2o(dg, dg_value, acquire); } void dispatch_group_leave(dispatch_group_t dg) { long value = os_atomic_dec2o(dg, dg_value, release); if (value == 0) { long rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed); do { int ret = sem_post(&dg->dg_sem); DISPATCH_SEMAPHORE_VERIFY_RET(ret); } while (--rval); dispatch_continuation_t next, head, tail = NULL; head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed); tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release); do { next = os_mpsc_pop_snapshot_head(head, tail, do_next); dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data; _dispatch_continuation_async(dsn_queue, head); _dispatch_release(dsn_queue); } while ((head = next)); _dispatch_release(dg); } }
|
dispatch_group_enter 方法对 dg_value 进行了加1操作。
对应的 dispatch_group_leave 方法会对 dg_value 进行减1操作。
减1之后如果发现 group 中没有正在执行的任务了,既 dg_value 为0了,那么就去唤醒 dispatch_group_wait 等待的地方,同时执行需要notify的任务。
dispatch_group_wait
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) { if (dg->dg_value == 0) { return 0; } if (timeout == 0) { errno = ETIMEDOUT; return (-1); } return _dispatch_group_wait_slow(dg, timeout); } static long _dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout) { long value; int orig_waiters; struct timespec _timeout; int ret; (void)os_atomic_inc2o(dg, dg_waiters, relaxed); switch (timeout) { default: do { uint64_t nsec = _dispatch_time_nanoseconds_since_epoch(timeout); _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); ret = sem_timedwait(&dg->dg_sem, &_timeout); } while (ret == -1 && errno == EINTR); if (!(ret == -1 && errno == ETIMEDOUT)) { break; } case DISPATCH_TIME_NOW: orig_waiters = dg->dg_waiters; while (orig_waiters) { if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters, orig_waiters - 1, &orig_waiters, relaxed)) { errno = ETIMEDOUT; return -1; } } case DISPATCH_TIME_FOREVER: do { ret = sem_wait(&dg->dg_sem); } while (ret == -1 && errno == EINTR); break; } return 0; }
|
如果 group 中当前没有在执行的任务,就直接返回成功了;
然后检查如果不等待,直接返回超时;
否则进入等待流程,这个等待流程和 dispatch_semaphore_wait 非常相似。
对源码的翻译有误希望大家使劲拍。