UNP_26_线程

Posted on Mar 26, 2020

26.1 概念

  在 Unix 上使用 fork 来创建子进程一直以来有一些问题:

  • fork 是昂贵的。fork 要把父进程的内存映像复制到子进程,并在子进程中复制所有描述符,等等。现在的实现是 copy-on-write 技术,用以避免在子进程切实需要自己的副本之前把父进程的数据空间复制到子进程,但是就算如此,fork 依旧是昂贵的。
  • fork 返回之后父子进程之间信息的传递需要进程间通信 IPC 机制。调用 fork 之前父进程尚未存在的子进程传递信息相当容易,因为子进程将从父进程数据空间及所有描述符的一个副本开始运行。然而从子进程中返回数据较为繁琐。

  线程有助于解决这两个问题。线程有时称为轻权进程,因为线程比进程权重较轻。也就是说线程创建比进程创建快 10 ~ 100 倍。
  统一进程内的所有线程共享相同的全局内存。这使得线程之间易于共享信息,然而伴随着这种简易性而来的确是同步问题。

统一进程内的所有线程共享的资源如下:

  • 全局变量
  • 进程指令
  • 大多数数据
  • 描述符
  • 信号处理函数和信号处置
  • 当前的工作目录
  • 用户 ID 和组 ID

每个线程单独使用的有:

  • 线程 ID
  • 寄存器集合,包括程序计数器和栈指针
  • 栈(存放局部变量和返回地址)
  • errno
  • 信号掩码
  • 优先级

26.2 基本线程函数:创建和终止

本节包括 5 个基本线程函数(老朋友)

pthread_create 函数

  当一个程序由 exec 启动执行时,称为初始线程主线程的单个线程就创建了。其余线程就由 pthread_create 创建。

#include<pthread.h>
int pthread_create(pthread_t *tid, const pthread_attr_t * attr, void*(*func)(void *), void *arg);
// 返回:成功返回 0,出错是正的 Exxx 值
  1. 一个进程内的线程都由一个线程 ID 标识,其数据类型为 pthread_t(unsigned int),如果新的线程创建,其 ID 就通过 tid 指针返回。
  2. 每个线程都有许多属性:优先级,初始栈大小,是否该成为一个守护线程 等等。初始化时可以修改 pthread_attr_t 结构来设置这些属性。通常情况下采用默认设置,此时 attr 是一个空指针。
  3. 创建一个线程时,最后指定的是运行的函数以及参数。线程通过调用这个函数开始执行,然后或者显示的终止,或者隐士的终止。该函数的地址由 func 参数指定,唯一调用参数是指针 arg。如果我们需要给该函数传递多个参数,就需要把这些参数打包成一个结构,然后把这个结构的地址作为单个参数传递给这个起始函数。

  func 和 arg 的声明。func 所指函数作为参数接受一个通用指针 void* 有作为返回值返回一个 void* 指针。这使得我们可以把一个指针(指向任何内容)传递给线程,又允许线程返回一个指针(指向任何内容)
※ ***注意返回值出错全为正值,和 errno 不相同,且 pthread 不会设置 errno。各种 Exxx 参考<sys/errno.h>***

pthread_join 函数

  可以通过 pthread_join 等待一个线程终止。对比线程和进程。pthread_create 类似于 fork,pthread_join 类似于 waitpid

#include<pthread.h>
int pthread_join(pthread_t* tid, void** status);
// 返回:成功返回 0,出错是正的 Exxx 值

※ 我们必须要指定等待线程的 tid。但是 pthread 并不能等待任意一个线程(类似于使用 -1 作为进程 ID 调用 waitpid)
※ 如果 status != null,来自所等待线程的返回值(一个指向某个对象的指针)将存入由 status 指向的位置。

pthread_self 函数

  每个线程都有一个唯一的线程 ID。线程 ID 由 pthread_create 返回,而且我们已经看到 pthread_join 使用它,每个线程通过 pthread_self 获取自身的线程 ID

#include<pthread.h>
pthread_t pthread_self(void);
// 返回:调用线程的线程 ID

※ 对比进程,类似于 getpid

pthread_detach 函数

  一个线程或者是可汇合的(joinable,默认),或者是脱离的(detached)当一个 joinable 线程终止时,它的线程 ID 和推出状态将留存到另一个线程对它调用 pthread_join。脱离的线程却像守护线程,当他们终止时,所有资源会释放,我们不能等待其终止。如果一个线程需要知道另一个线程什么时候终止,那就最好保持第二个线程的 joinable 状态。

#include<pthread.h>
int pthread_detach(pthread_t tid);
// 返回:若成功则为 0,若出错则为正的 Exxx 值

本函数通常有想让自己脱离的线程调用:

pthread_detach(pthread_self());

pthread_exit 函数

让线程终止的方法之一是调用 pthread_exit

#include<pthread.h>
void pthread_exit(void* status);

※ 如果本线程没有脱离,它的线程 ID 和退出状态会一直留存到调用进程内的某个其他线程对它调用 pthread_join
※ 指针 status 不能指向调用线程的局部对象,因为终止时局部对象会消失。

让一个进程终止的方法:

  1. 调用 pthread_exit 函数
  2. 启动线程的函数(pthread_create)可以返回
  3. 如果进程的 main 函数返回或者任意线程调用了 exit,整个进程终止,包括其中的任何线程

26.3 使用线程的 str_cli 函数

把使用 fork 的 str_cli 改写成使用线程

void	*copyto(void *);

// 全局变量传参,简单一些
static int	sockfd;		/* global for both threads to access */
static FILE	*fp;

void
str_cli(FILE *fp_arg, int sockfd_arg)
{
	char		recvline[MAXLINE];
	pthread_t	tid;

	sockfd = sockfd_arg;	/* copy arguments to externals */
	fp = fp_arg;
    // 创建一个线程
	Pthread_create(&tid, NULL, copyto, NULL);
    // 主循环调用 readline 以及 fputs,把从套接字读入的每个文本行复制到标准输出
	while (Readline(sockfd, recvline, MAXLINE) > 0)
		Fputs(recvline, stdout);
}

// 线程执行函数
void * copyto(void *arg)
{
	char	sendline[MAXLINE];
    // 吧读入的数据写入到套接字
	while (Fgets(sendline, MAXLINE, fp) != NULL)
		Writen(sockfd, sendline, strlen(sendline));
    // 发送 FIN
	Shutdown(sockfd, SHUT_WR);	/* EOF on stdin, send FIN */
    // 返回,从启动线程的函数终止该线程,上面的第二种方式
	return(NULL);
		/* 4return (i.e., thread terminates) when EOF on stdin */
}

26.4 使用线程的 TCP 回射服务器程序

  把回射程序换成每个客户一个线程,而不是一个子进程。我们同样使用自己的 tcp_listen 函数使得该程序协议无关

static void	*doit(void *);		/* each thread executes this function */

int
main(int argc, char **argv)
{
	int				listenfd, connfd;
	pthread_t		tid;
	socklen_t		addrlen, len;
	struct sockaddr	*cliaddr;

	if (argc == 2)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 3)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: tcpserv01 [ <host> ] <service or port>");

	cliaddr = Malloc(addrlen);
	// 创建一个子线程为客户服务
	for ( ; ; ) {
		len = addrlen;
		connfd = Accept(listenfd, cliaddr, &len);
		Pthread_create(&tid, NULL, &doit, (void *) connfd);
	}
}
// 唯一的参数是描述符 connfd
static void * doit(void *arg)
{
	// 首先变为脱离状态,因为主线程没有必要等待它拆功能键的所有线程。之后调用 str_echo
	Pthread_detach(pthread_self());
	str_echo((int) arg);	/* same function as before */
	Close((int) arg);		/* done with connected socket */
	return(NULL);
}

※ 这里主线程不会进行描述符的 close,因为一个进程中的所有的线程共享所有描述符

给新线程传递参数

这里有一个点需要注意: Pthread_create(&tid, NULL, &doit, (void *) connfd);
如果使用: Pthread_create(&tid, NULL, &doit, &connfd);
  可能导致较短时间创建两个线程,参数是同一个指针所指的数据。所以我们通过把 connfd 的值传递给 pthread_create 就不会发生这种事,下面有更好的方式。

static void	*doit(void *);		/* each thread executes this function */

int
main(int argc, char **argv)
{
	int				listenfd, *iptr;
	thread_t		tid;
	socklen_t		addrlen, len;
	struct sockaddr	*cliaddr;

	if (argc == 2)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 3)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: tcpserv01 [ <host> ] <service or port>");

	cliaddr = Malloc(addrlen);

	for ( ; ; ) {
		len = addrlen;
		// 直接创建一个新的内存来进行传递
		iptr = Malloc(sizeof(int));
		*iptr = Accept(listenfd, cliaddr, &len);
		Pthread_create(&tid, NULL, &doit, iptr);
	}
}

static void *
doit(void *arg)
{
	int		connfd;
	// 这里需要注意释放,否则内存会泄露
	connfd = *((int *) arg);
	free(arg);

	Pthread_detach(pthread_self());
	str_echo(connfd);		/* same function as before */
	Close(connfd);			/* done with connected socket */
	return(NULL);
}

※ malloc 和 free 是不可以重入的。在主线程处于这两个函数之一时,从某个信号处理函数调用这两个函数之一可能导致灾难性后果,因为这两个函数操纵相同的静态数据,这里在库函数中就进行了同步互斥操作,对用户透明。

线程安全函数

所有 c 标准定义的函数是线程安全的,需要注意的是网络编程的函数并不一定是线程安全的。

26.5 线程特定数据 thread-specific data(TSD)

  把一个未线程化的程序转换成使用线程的版本时,有时会碰到其中函数使用静态数据引起的错误。错误结果不确定。当一个进程中的多个线程同时调用这个静态数据时就会产生问题,因为它们使用的是同一个值,并不是每个线程单独的值。解决办法如下:

  • 使用线程特定数据。这个版本转换成了只能在线程中使用的版本。
  • 改变调用顺序
  • 改变接口的结构,避免使用静态变量,这样函数就是线程安全的。

TSD 目的:

线程特定数据,也称为线程私有数据,是存储和查询某个特定数据相关数据的一种机制。

在单线程程序中,我们经常要用到“全局变量”以实现多个函数间共享数据。

在多线程环境下,由于数据空间是共享的,因此全局变量也为所有所有线程所共有。

但有时应用程序设计中有必要提供线程私有的全局变量,仅在某个线程中有效,但却可以跨多个函数访问。

相关函数

  在分配线程特定数据之前,需要创建与该数据关联的键。这个键将用于获取对线程特定数据的访问。使用 pthread_key_create函数创建一个键。

  int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*));

  创建的键存储在pkey指向的内存单元中,这个键可以被进程中的所有线程使用,但每个线程与不同的线程特定数据地址相关联。创建新键时,每个线程的数据地址设为空值。
  除了创建键以外,pthread_key_create可以为该键关联一个可选择的析构函数。当这个线程退出时,如果数据地址已经被置为非空值,那么析构函数就会被调用,它唯一的参数就是该数据地址。如果传入的析构函数为空,就表明没有析构函数与这个键关联。
  线程通常使用malloc为线程特定数据分配内存。析构函数通常释放已分配的内存。
  对所有的线程,我们都可以通过调用pthread_key_delete函数来取消键与线程特定数据值之间的联系。

   int pthread_key_delete(pthread_key_t key);

  有些线程可能看到一个键值,而其他的线程看到的可能是另一个不同的键值,这取决于系统是如何调度线程的,解决这种竞争的办法是使用pthread_once函数 。

       pthread_once_t once_control = PTHREAD_ONCE_INIT;
       int  pthread_once(pthread_once_t  *once_control,  void  (*init_routine) (void));

  once_control必须是一个非本地变量(如全局变量或静态变量),而且必须初始化为PTHREAD_ONCE_INIT。如果每个线程都调用pthread_once,系统就能保证初始化once_control只被调用一次,即系统首次调用pthread_once时。
  键一旦创建以后,就可以通过调用pthread_setspecific函数把键和线程特定数据关联起来。可以通过pthread_getspecific获得线程特定数据的地址。  

int pthread_setspecific(pthread_key_t key, const void *pointer);
void * pthread_getspecific(pthread_key_t key);

  如果没有线程特定数据值与键关联,pthread_getspecific将返回一个空指针,我们可以用这个空指针确定是否需要调用pthread_setspecific。

26.6 web 客户与同时连接

  使用线程代替非阻塞 I/O 编写 web 客户程序例子。改用线程之后,我们可以让套接字停留在默认的阻塞模式,对每个连接创建一个线程。每个线程可以阻塞在它的 connect 调用中,因为内核会交替运行线程。

#define	MAXFILES	20
#define	SERV		"80"	/* port number or service name */

struct file {
  char	*f_name;			/* filename */
  char	*f_host;			/* hostname or IP address */
  int    f_fd;				/* descriptor */
  int	 f_flags;			/* F_xxx below */
  pthread_t	 f_tid;			/* thread ID */
} file[MAXFILES];
#define	F_CONNECTING	1	/* connect() in progress */
#define	F_READING		2	/* connect() complete; now reading */
#define	F_DONE			4	/* all done */

#define	GET_CMD		"GET %s HTTP/1.0\r\n\r\n"

int		nconn, nfiles, nlefttoconn, nlefttoread;

void	*do_get_read(void *);
void	home_page(const char *, const char *);
void	write_get_cmd(struct file *);

int
main(int argc, char **argv)
{
	int			i, n, maxnconn;
	pthread_t	tid;
	struct file	*fptr;

	if (argc < 5)
		err_quit("usage: web <#conns> <IPaddr> <homepage> file1 ...");
	maxnconn = atoi(argv[1]);

	nfiles = min(argc - 4, MAXFILES);
	for (i = 0; i < nfiles; i++) {
		file[i].f_name = argv[i + 4];
		file[i].f_host = argv[2];
		file[i].f_flags = 0;
	}
	printf("nfiles = %d\n", nfiles);

	home_page(argv[2], argv[3]);

	nlefttoread = nlefttoconn = nfiles;
	nconn = 0;

	while (nlefttoread > 0) {
		// 创建另一个线程,新线程执行的函数是 do_get_read,传递给它的参数是指向 file 结构的指针
		while (nconn < maxnconn && nlefttoconn > 0) {
				/* 4find a file to read */
			for (i = 0 ; i < nfiles; i++)
				if (file[i].f_flags == 0)
					break;
			if (i == nfiles)
				err_quit("nlefttoconn = %d but nothing found", nlefttoconn);

			file[i].f_flags = F_CONNECTING;
			Pthread_create(&tid, NULL, &do_get_read, &file[i]);
			file[i].f_tid = tid;
			nconn++;
			nlefttoconn--;
		}
		// 等待任何一个线程结束,使用的是 thr_join,因为 pthread 并不提供这种操作
		// 后面会进行改变,使用性能更好的 Pthread 操作
		if ( (n = thr_join(0, &tid, (void **) &fptr)) != 0)
			errno = n, err_sys("thr_join error");

		nconn--;
		nlefttoread--;
		printf("thread id %d for %s done\n", tid, fptr->f_name);
	}

	exit(0);
}

void *
do_get_read(void *vptr)
{
	int					fd, n;
	char				line[MAXLINE];
	struct file			*fptr;

	fptr = (struct file *) vptr;
	// 创建一个 TCP 连接,是一个阻塞的套接字,会一直阻塞在 connect
	fd = Tcp_connect(fptr->f_host, SERV);
	fptr->f_fd = fd;
	printf("do_get_read for %s, fd %d, thread %d\n",
			fptr->f_name, fd, fptr->f_tid);
	// 向服务器写入请求
	write_get_cmd(fptr);	/* write() the GET command */

		/* 4Read server's reply */
	// 读取服务器的应答
	for ( ; ; ) {
		if ( (n = Read(fd, line, MAXLINE)) == 0)
			break;		/* server closed connection */

		printf("read %d bytes from %s\n", n, fptr->f_name);
	}
	printf("end-of-file on %s\n", fptr->f_name);
	Close(fd);
	fptr->f_flags = F_DONE;		/* clears F_READING */
	// 终止线程
	return(fptr);		/* terminate thread */
}

void
write_get_cmd(struct file *fptr)
{
	int		n;
	char	line[MAXLINE];

	n = snprintf(line, sizeof(line), GET_CMD, fptr->f_name);
	Writen(fptr->f_fd, line, n);
	printf("wrote %d bytes for %s\n", n, fptr->f_name);

	fptr->f_flags = F_READING;			/* clears F_CONNECTING */
}

void
home_page(const char *host, const char *fname)
{
	int		fd, n;
	char	line[MAXLINE];

	fd = Tcp_connect(host, SERV);	/* blocking connect() */

	n = snprintf(line, sizeof(line), GET_CMD, fname);
	Writen(fd, line, n);

	for ( ; ; ) {
		if ( (n = Read(fd, line, MAXLINE)) == 0)
			break;		/* server closed connection */

		printf("read %d bytes of home page\n", n);
		/* do whatever with data */
	}
	printf("end-of-file on home page\n");
	Close(fd);
}

26.7 互斥锁

  同一段代码操作全局变量在多线程的情况下会发生错误。
  我们称线程编程为并发编程,在多线程的秦观管辖需要注意全局变量等可能会因为计算机的执行顺序所导致的错误。
  针对上面的问题可以使用互斥锁 mutex 保护这个全局变量进行解决。

#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t* mptr);
int pthread_mutex_unlock(pthread_mutex_t* mptr);
// 返回:成功返回 0,出错返回正值的 Exxx

※ 如果试图对一个已经被其它的线程上锁的互斥锁上锁,会阻塞住,直到互斥锁解锁。
※ 如果某个互斥锁变量是静态分配的,就需要初始化为 PTHREAD_MUTEX_INITIALIZER。

// 使用互斥锁的计数器
#define	NLOOP 5000

int				counter;		/* incremented by threads */
pthread_mutex_t	counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void	*doit(void *);

int
main(int argc, char **argv)
{
	pthread_t	tidA, tidB;

	Pthread_create(&tidA, NULL, &doit, NULL);
	Pthread_create(&tidB, NULL, &doit, NULL);

		/* 4wait for both threads to terminate */
	Pthread_join(tidA, NULL);
	Pthread_join(tidB, NULL);

	exit(0);
}

void *
doit(void *vptr)
{
	int		i, val;

	/*
	 * Each thread fetches, prints, and increments the counter NLOOP times.
	 * The value of the counter should increase monotonically.
	 */

	for (i = 0; i < NLOOP; i++) {
		// 加锁
		Pthread_mutex_lock(&counter_mutex);

		val = counter;
		printf("%d: %d\n", pthread_self(), val + 1);
		counter = val + 1;

		Pthread_mutex_unlock(&counter_mutex);
	}

	return(NULL);
}

※ 互斥锁的开销并不是很大,但是一旦过于频繁,还是会带来性能问题

26.8 条件变量

  互斥锁适用于防止同时访问某个共享变量,但是我们需要另外某种等待某个条件发生期间能让我们进入睡眠的东西。
  互斥锁提供互斥机制,条件变量提供信号机制。

#include<pthread.h>
int pthread_cond_wait(pthread_cond_t* cptr, pthread_mutex_t* mptr);
int pthread_cond_signal(pthraed_cond_t* cptr);
// 可以在信号发生时将唤醒多个线程
int pthread_cond_broadcast(pthraed_cond_t* cptr);
// 线程设置一个阻塞时间,注意这里的时间是一个绝对时间,也就是具体的时间(阻塞解除的时间),而不是阻塞的时间(阻塞的时长)
int pthread_cond_timedwait(pthraed_cond_t* cptr, pthread_mutex_t* mptr, const struct timespec* abstime);
// 返回:成功返回 0,出错返回正值的 Exxx

※ 条件变量一般和互斥锁一起使用

26.9 web 客户与同时连接(续)

使用互斥锁和条件变量修改了上面的程序

	while (nlefttoread > 0) {
		while (nconn < maxnconn && nlefttoconn > 0) {
				/* 4find a file to read */
			for (i = 0 ; i < nfiles; i++)
				if (file[i].f_flags == 0)
					break;
			if (i == nfiles)
				err_quit("nlefttoconn = %d but nothing found", nlefttoconn);

			file[i].f_flags = F_CONNECTING;
			Pthread_create(&tid, NULL, &do_get_read, &file[i]);
			file[i].f_tid = tid;
			nconn++;
			nlefttoconn--;
		}

			/* 4Wait for thread to terminate */
		Pthread_mutex_lock(&ndone_mutex);
		// 等待某个线程终止,睡眠由 Pthread_cond_wait 执行
		while (ndone == 0)
			Pthread_cond_wait(&ndone_cond, &ndone_mutex);
		// 当发现某个线程终止时,遍历 file 找到它,在调用 pthread_join
		for (i = 0; i < nfiles; i++) {
			if (file[i].f_flags & F_DONE) {
				Pthread_join(file[i].f_tid, (void **) &fptr);

				if (&file[i] != fptr)
					err_quit("file[i] != fptr");
				fptr->f_flags = F_JOINED;	/* clears F_DONE */
				ndone--;
				nconn--;
				nlefttoread--;
				printf("thread %d for %s done\n", fptr->f_tid, fptr->f_name);
			}
		}
		Pthread_mutex_unlock(&ndone_mutex);
	}

小结

  • 创建一个新的线程比 fork 一个子进程快得多,但是需要注意并发编程。
  • 统一进程中的线程共享:全局变量,描述符 等。这也引入了同步问题,可以使用 Pthread 的互斥锁和条件变量。
  • 后面会讲到线程池技术。