dispatch_group搬运源码

之前这篇文章dispatch_semaphore搬运源码搬了dispatch_semaphore相关的方法,后来又看了看group相关的,semaphore和group的源码大多都在同一个文件,所以处理逻辑也有很多相似的地方。


dispatch_group_t的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct dispatch_group_s {
// 引用计数之类的基础变量
OS_OBJECT_STRUCT_HEADER(dispatch_group);
// dispatch基础类的共有变量
struct dispatch_group_s *volatile do_next;
struct dispatch_queue_s *do_targetq;
void *do_ctxt;
void *do_finalizer;
// group自有变量
long volatile dg_value; // 正在执行的任务数量
struct dispatch_continuation_s *volatile dg_notify_head;
struct dispatch_continuation_s *volatile dg_notify_tail;
sem_t dg_sem;
};

关键的有三个变量,dg_valuedg_notify_headdg_notify_tail

  • dg_value 是当前正在执行的任务数量
  • dg_notify_head 是排队等待notify的队首
  • dg_notify_tail 是排队等待notify的队尾

dispatch_group_create

1
2
3
4
5
6
7
8
9
10
11
12
dispatch_group_t dispatch_group_create(void) {
return _dispatch_group_create_with_count(0);
dispatch_group_t dg = (dispatch_group_t)_dispatch_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, false);
dg->dsema_value = value;
int ret = sem_init(&dg->dsema_sem, 0, 0);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
return dg;
}

主要是给 dsema_value 赋值。


dispatch_group_async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) {
// 创建任务对象
dispatch_continuation_t dc = _dispatch_continuation_alloc();
dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT | DISPATCH_OBJ_BLOCK_BIT;
dc->dc_ctxt = _dispatch_Block_copy(db);
dc->dc_func = _dispatch_call_block_and_release;
dispatch_group_enter(dg);
dc->dc_data = dg;
// 下面的操作都和 dispatch_async 相同
// 找到转发的queue
dq = dq->do_targetq;
while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
if (!_dispatch_queue_try_acquire_async(dq)) break;
dq = dq->do_targetq;
}
// 更新队列尾部,和dispatch_barrier_async相同->_dispatch_continuation_push(dq, dc);
if (_dispatch_queue_push_update_tail(dq, dc)) { // 如果队列为空,会return ture
// 如果队列为空,那么更新队尾后队列头部也要更新一下
_dispatch_queue_push_update_head(dq, tail, true);
dispatch_wakeup_flags_t flags = DISPATCH_WAKEUP_CONSUME | DISPATCH_WAKEUP_FLUSH;
// dx_wakeup 会调用队列的_dispatch_queue_wakeup函数指针,每个队列有自己不同的处理,见do_weakeup.c
(&dq->do_vtable->_os_obj_vtable)->dx_wakeup(dq, pp, 0)
}
}

就是创建一个任务,调用 dispatch_group_enter(dg)group 中标记有一个正在执行的任务,然后将 group 赋值到任务的 dc->dc_data 中。最后的执行和 dispatch_async 一样。

不同的是,我们可以看到在 dispatch_group_async 中被创建的任务添加了 DISPATCH_OBJ_GROUP_BIT 的标记。有了这个标记后,任务在 dispatch_async 执行时的不同如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 对于标记有 DISPATCH_OBJ_GROUP_BIT 的任务,在执行的invoke方法中会稍有不同
static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov, dispatch_invoke_flags_t flags) {
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
// 先将任务加入缓存,这样在回调的的时候效率会更高
dc1 = _dispatch_continuation_free_cacheonly(dc);
// group的情况
if (dc->dc_flags & DISPATCH_OBJ_GROUP_BIT) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dc->dc_data);
dispatch_group_leave((dispatch_group_t)dc->dc_data);
} else {
// ...
/* 其它情况 */
// ...
}
// 清除缓存
_dispatch_continuation_free_to_cache_limit(dc1);
});
}

可以发现最大的不同在 if (dc->dc_flags & DISPATCH_OBJ_GROUP_BIT) 条件中,在执行完 async 任务后,会调用一下 dispatch_group_leavegroup 中标记有一个任务完成了。


dispatch_group_notify

1
2
3
4
5
6
7
8
9
void dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) {
// 创建任务对象
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
dsn->dc_flags = DISPATCH_OBJ_CONSUME_BIT;
dsn->dc_ctxt = _dispatch_Block_copy(db);
// 更新一下 dg 的 dg_notify_tail 列表和 dg_notify_head 列表
_dispatch_group_notify(dg, dq, dsn);
}

在等待列表中添加一个需要被通知的任务,更新一下 dg_notify_tail 队尾,同时如果有必要,还会再更新一下队首 dg_notify_head


dispatch_group_enter & dispatch_group_leave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void dispatch_group_enter(dispatch_group_t dg) {
// 对 dg_value 进行自增就OK了
long value = os_atomic_inc_orig2o(dg, dg_value, acquire);
}
void dispatch_group_leave(dispatch_group_t dg) {
// 对 dg_value 进行自减
long value = os_atomic_dec2o(dg, dg_value, release);
// 如果减到0了,需要唤醒和执行了
if (value == 0) {
// 唤醒所有用dispatch_group_wait同步等待的地方
long rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed);
do {
int ret = sem_post(&dg->dg_sem);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
} while (--rval);
// 执行所有notify列表里等待的任务
dispatch_continuation_t next, head, tail = NULL;
head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed);
tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release);
do {
next = os_mpsc_pop_snapshot_head(head, tail, do_next);
dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data;
// 并发执行notify列表中等待的任务, 和 dispatch_async 一样
_dispatch_continuation_async(dsn_queue, head);
_dispatch_release(dsn_queue);
} while ((head = next));
_dispatch_release(dg);
}
}

dispatch_group_enter 方法对 dg_value 进行了加1操作。

对应的 dispatch_group_leave 方法会对 dg_value 进行减1操作。

减1之后如果发现 group 中没有正在执行的任务了,既 dg_value 为0了,那么就去唤醒 dispatch_group_wait 等待的地方,同时执行需要notify的任务。


dispatch_group_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) {
if (dg->dg_value == 0) {
return 0;
}
if (timeout == 0) {
errno = ETIMEDOUT;
return (-1);
}
// 开始等待,超时之前在while循环中不停的检查dg->dg_sem,直到超时退出或得到信号
return _dispatch_group_wait_slow(dg, timeout);
}
// 这个方法和 semaphore 的 wait 类似
static long _dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout) {
long value;
int orig_waiters;
struct timespec _timeout;
int ret;
(void)os_atomic_inc2o(dg, dg_waiters, relaxed);
switch (timeout) {
default:
// 等到超时
do {
uint64_t nsec = _dispatch_time_nanoseconds_since_epoch(timeout);
_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
ret = sem_timedwait(&dg->dg_sem, &_timeout);
} while (ret == -1 && errno == EINTR);
// 超时之前等到资源了
if (!(ret == -1 && errno == ETIMEDOUT)) {
break;
}
case DISPATCH_TIME_NOW:
orig_waiters = dg->dg_waiters;
while (orig_waiters) {
// 超时之前没等到资源,超时返回
if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters, orig_waiters - 1, &orig_waiters, relaxed)) {
errno = ETIMEDOUT;
return -1;
}
}
case DISPATCH_TIME_FOREVER:
// 一直等
do {
ret = sem_wait(&dg->dg_sem);
} while (ret == -1 && errno == EINTR);
break;
}
return 0;
}

如果 group 中当前没有在执行的任务,就直接返回成功了;

然后检查如果不等待,直接返回超时;

否则进入等待流程,这个等待流程和 dispatch_semaphore_wait 非常相似。


对源码的翻译有误希望大家使劲拍。


iOS Objective-C libdispatch GCD