Plan 9のpipe実装を読む
知りたいこと
Section titled “知りたいこと”- パイプの書き込みバッファをどこに保存しているのか
- パイプが詰まるのはバッファにどれだけ書き込まれたときか
登場するデータ型
Section titled “登場するデータ型”クソデカ構造体なので一部抜粋。
struct Chan{ ushort type; ulong dev; ushort mode; /* read/write */ Qid qid; int fid; /* for devmnt */ ulong iounit; /* chunk size for i/o; 0==default */ union { void* aux; Qid pgrpid; /* for #p/notepg */ ulong mid; /* for ns in devproc */ };};pipe(2) で作成したファイルディスクリプタのペアを管理する構造体。
typedef struct Pipe Pipe;struct Pipe{ QLock; Pipe *next; int ref; ulong path; long perm; Queue *q[2]; int qref[2];};pipe(2) の入口または出口に相当する構造体。Block 型のリストを管理する。
/* * IO queues */typedef struct Queue Queue;struct Queue{ Lock;
/* リストの最初と最後の要素へのポインタを管理する */ Block* bfirst; /* buffer */ Block* blast;
int len; /* bytes allocated to queue */ int dlen; /* data bytes in queue */ int limit; /* max bytes in queue */ int inilim; /* initial limit */ int state; int noblock; /* true if writes return immediately when q full */ int eof; /* number of eofs read by user */
void (*kick)(void*); /* restart output */ void (*bypass)(void*, Block*); /* bypass queue altogether */ void* arg; /* argument to kick */
QLock rlock; /* mutex for reading processes */ Rendez rr; /* process waiting to read */ QLock wlock; /* mutex for writing processes */ Rendez wr; /* process waiting to write */
char err[ERRMAX];};パイプに書き込まれた/読み込まれる前のデータを表現する構造体。
typedef struct Block Block;struct Block{ long ref; Block* next; Block* list; uchar* rp; /* first unconsumed byte */ uchar* wp; /* first empty byte */ uchar* lim; /* 1 past the end of the buffer */ uchar* base; /* start of the buffer */ void (*free)(Block*); ushort flag; ushort checksum; /* IP checksum of complete packet (minus media header) */ ulong magic;};
#define BLEN(s) ((s)->wp - (s)->rp)#define BALLOC(s) ((s)->lim - (s)->base)pipe(2)を実行したとき
Section titled “pipe(2)を実行したとき”カーネル内の syspipe(9) では #| をopenして Pipe 構造体を作っている。
int fd[2];Chan *c[2];Dev *d;
d = devtab[devno('|', 0)]; /* これは pipedevtab のこと */c[0] = namec("#|", Atodir, 0, 0);c[1] = cclone(c[0]);c[0] = d->open(c[0], ORDWR); /* d->open は pipeopen と同等 */c[1] = d->open(c[1], ORDWR);newfd2(fd, c); /* ここでfdとChanを繋げている; このfdをユーザープロセスに返す */Pipe 構造体は Queue 構造体(io queue)を2つ持っていて、パイプの入口/出口におおむね相当する。Queue には Block があり、それがpipeのバッファになっている。
ユーザーランドから pipe(2) した fd にアクセスすると、9pを経由して、カーネルは fd にマップされた Chan 構造体を探す。このとき見つかった Chan の aux フィールドに Pipe 構造体がセットされている。
パイプにwriteしたとき
Section titled “パイプにwriteしたとき”pipewrite(9) 関数では fd[1] に write(2) した場合は p->q[1] に、fd[0] の場合は p->q[0] に qwrite(9) する。qwrite(9) では write(2) されたバイト数だけ Block 構造体をアロケートして Queue に追加する。
intqwrite(Queue *q, void *vp, int len){ int n, sofar = 0; Block *b;
do { n = len-sofar; if(n > Maxatomic) n = Maxatomic; b = allocb(n); memmove(b->wp, p+sofar, n); b->wp += n; qbwrite(q, b); sofar += n; } while(sofar < len && (q->state&Qmsg) == 0);}Maxatomic は64KBなので、同じパイプに複数プロセスが同時に書き込みをしても64KBまではデータが混ざらない。
enum{ Maxatomic = 64*1024,};uint qiomaxatomic = Maxatomic;qbwrite(9) は Queue 構造体に Block 構造体を追加する。
/* * add a block to a queue obeying flow control */longqbwrite(Queue *q, Block *b){ /* nonblockモードの動作は省略 */ int dowakeup = 0; if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; q->len += BALLOC(b); q->dlen += n;
/* make sure other end gets awakend */ if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; }
/* get output going again */ if(q->kick && (dowakeup || (q->state&Qkick)) q->kick(q->arg);
/* wakeup anyone consuming at end other end */ if(dowakeup){ p = wakeup(&q->rr); /* if we just wokeup a higher priority pricess, let it run */ if(p != nil && p->priority > up->priority) sched(); }
for(;;){ if(qnotfull(q)) break; q->state |= Qflow; sleep(&q->wr, qnotfull, q); }}
static intqnotfull(void *a){ Queue *q = a; return q->len < q->limit || (q->state & Qclosed);}qbwrite(9) では q->limit を超えたらブロックする。では具体的な q->limit はいくつ?
正解は pipeattach(9) で設定している conf.pipeqsize がそれ。256KBまたは32KBだったはず。
static Chan*pipeattach(char *spec){ ... p = malloc(sizeof(Pipe)) p->q[0] = qopen(conf.pipeqsize, 0, 0, 0); p->q[1] = qopen(conf.pipeqsize, 0, 0, 0); ...}ところでPlan 9の pipe(2) は双方向通信が可能らしい。
パイプからreadしたとき
Section titled “パイプからreadしたとき”こっちは読んでないけど、パイプからreadするときは Queue 構造体に追加された Block を先頭から読んでいくはず。Block の途中まで読んだら b->rp が指すポインタが未読の位置まで移動する。そのため未読のデータは rp 〜 wp の間にある。Block をすべて読み終わったらリストから取り除く。
qbwrite(9) では必要になったときプロセスを切り替えている。このとき wakeup(9) を呼び出すが、その中では ready(9) が呼ばれる。
// qbwritewakeup(&q->rr);
// wakeup(Rendez *r)Proc *p = r->p;ready(p);
// ready(Proc *p)updatecpu(p);pri = reprioritize(p);p->priority = pri;rq = &runq[pri];p->state = Ready;queueproc(rq, p);pipewrite(2) の途中で wakeup(9) しなくても勝手にスケジュールされるように思うけど、次はこの辺りから読むと理解が進むかもしれない。