Skip to content

Plan 9のpipe実装を読む

  • パイプの書き込みバッファをどこに保存しているのか
  • パイプが詰まるのはバッファにどれだけ書き込まれたときか

クソデカ構造体なので一部抜粋。

struct Chan
{
ushort type;
ulong dev;
ushort mode; /* read/write */
Qid qid;
int fid; /* for devmnt */
ulong iounit; /* chunk size for i/o; 0==default */
union {
void* aux;
Qid pgrpid; /* for #p/notepg */
ulong mid; /* for ns in devproc */
};
};

pipe(2) で作成したファイルディスクリプタのペアを管理する構造体。

typedef struct Pipe Pipe;
struct Pipe
{
QLock;
Pipe *next;
int ref;
ulong path;
long perm;
Queue *q[2];
int qref[2];
};

pipe(2) の入口または出口に相当する構造体。Block 型のリストを管理する。

/*
* IO queues
*/
typedef struct Queue Queue;
struct Queue
{
Lock;
/* リストの最初と最後の要素へのポインタを管理する */
Block* bfirst; /* buffer */
Block* blast;
int len; /* bytes allocated to queue */
int dlen; /* data bytes in queue */
int limit; /* max bytes in queue */
int inilim; /* initial limit */
int state;
int noblock; /* true if writes return immediately when q full */
int eof; /* number of eofs read by user */
void (*kick)(void*); /* restart output */
void (*bypass)(void*, Block*); /* bypass queue altogether */
void* arg; /* argument to kick */
QLock rlock; /* mutex for reading processes */
Rendez rr; /* process waiting to read */
QLock wlock; /* mutex for writing processes */
Rendez wr; /* process waiting to write */
char err[ERRMAX];
};

パイプに書き込まれた/読み込まれる前のデータを表現する構造体。

typedef struct Block Block;
struct Block
{
long ref;
Block* next;
Block* list;
uchar* rp; /* first unconsumed byte */
uchar* wp; /* first empty byte */
uchar* lim; /* 1 past the end of the buffer */
uchar* base; /* start of the buffer */
void (*free)(Block*);
ushort flag;
ushort checksum; /* IP checksum of complete packet (minus media header) */
ulong magic;
};
#define BLEN(s) ((s)->wp - (s)->rp)
#define BALLOC(s) ((s)->lim - (s)->base)

カーネル内の syspipe(9) では #| をopenして Pipe 構造体を作っている。

int fd[2];
Chan *c[2];
Dev *d;
d = devtab[devno('|', 0)]; /* これは pipedevtab のこと */
c[0] = namec("#|", Atodir, 0, 0);
c[1] = cclone(c[0]);
c[0] = d->open(c[0], ORDWR); /* d->open は pipeopen と同等 */
c[1] = d->open(c[1], ORDWR);
newfd2(fd, c); /* ここでfdとChanを繋げている; このfdをユーザープロセスに返す */

Pipe 構造体は Queue 構造体(io queue)を2つ持っていて、パイプの入口/出口におおむね相当する。Queue には Block があり、それがpipeのバッファになっている。

ユーザーランドから pipe(2) した fd にアクセスすると、9pを経由して、カーネルは fd にマップされた Chan 構造体を探す。このとき見つかった Chanaux フィールドに Pipe 構造体がセットされている。

pipewrite(9) 関数では fd[1]write(2) した場合は p->q[1] に、fd[0] の場合は p->q[0]qwrite(9) する。qwrite(9) では write(2) されたバイト数だけ Block 構造体をアロケートして Queue に追加する。

int
qwrite(Queue *q, void *vp, int len)
{
int n, sofar = 0;
Block *b;
do {
n = len-sofar;
if(n > Maxatomic)
n = Maxatomic;
b = allocb(n);
memmove(b->wp, p+sofar, n);
b->wp += n;
qbwrite(q, b);
sofar += n;
} while(sofar < len && (q->state&Qmsg) == 0);
}

Maxatomic は64KBなので、同じパイプに複数プロセスが同時に書き込みをしても64KBまではデータが混ざらない。

enum
{
Maxatomic = 64*1024,
};
uint qiomaxatomic = Maxatomic;

qbwrite(9)Queue 構造体に Block 構造体を追加する。

/*
* add a block to a queue obeying flow control
*/
long
qbwrite(Queue *q, Block *b)
{
/* nonblockモードの動作は省略 */
int dowakeup = 0;
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
q->len += BALLOC(b);
q->dlen += n;
/* make sure other end gets awakend */
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
}
/* get output going again */
if(q->kick && (dowakeup || (q->state&Qkick))
q->kick(q->arg);
/* wakeup anyone consuming at end other end */
if(dowakeup){
p = wakeup(&q->rr);
/* if we just wokeup a higher priority pricess, let it run */
if(p != nil && p->priority > up->priority)
sched();
}
for(;;){
if(qnotfull(q))
break;
q->state |= Qflow;
sleep(&q->wr, qnotfull, q);
}
}
static int
qnotfull(void *a)
{
Queue *q = a;
return q->len < q->limit || (q->state & Qclosed);
}

qbwrite(9) では q->limit を超えたらブロックする。では具体的な q->limit はいくつ? 正解は pipeattach(9) で設定している conf.pipeqsize がそれ。256KBまたは32KBだったはず。

static Chan*
pipeattach(char *spec)
{
...
p = malloc(sizeof(Pipe))
p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
...
}

ところでPlan 9の pipe(2) は双方向通信が可能らしい。

こっちは読んでないけど、パイプからreadするときは Queue 構造体に追加された Block を先頭から読んでいくはず。Block の途中まで読んだら b->rp が指すポインタが未読の位置まで移動する。そのため未読のデータは rpwp の間にある。Block をすべて読み終わったらリストから取り除く。

qbwrite(9) では必要になったときプロセスを切り替えている。このとき wakeup(9) を呼び出すが、その中では ready(9) が呼ばれる。

// qbwrite
wakeup(&q->rr);
// wakeup(Rendez *r)
Proc *p = r->p;
ready(p);
// ready(Proc *p)
updatecpu(p);
pri = reprioritize(p);
p->priority = pri;
rq = &runq[pri];
p->state = Ready;
queueproc(rq, p);

pipewrite(2) の途中で wakeup(9) しなくても勝手にスケジュールされるように思うけど、次はこの辺りから読むと理解が進むかもしれない。