loop { if sq.is_full() { match submitter.submit() { Ok(_) => (), Err(ref err) => if err.raw_os_error() == Some(libc::EBUSY) {break;}, Err(err) => panic!(err) } } sq.sync();
match backlog.pop_front() { Some(sqe) => unsafe { let _ = sq.push(&sqe); },
None => break, } }
accept.push_to(&mut sq);
for cqe in &mut cq { let ret = cqe.result(); let token_index = cqe.user_data() asusize;
if ret < 0 { eprintln!( "token {:?} error: {:?}", token_alloc.get(token_index), io::Error::from_raw_os_error(-ret) ); continue; }
let token = &mut token_alloc[token_index]; match token.clone() { Token::Accept => { accept.count += 1; let fd = ret; sockets.push(fd); let poll_token = token_alloc.insert(Token::Poll{ fd }); let poll_e = opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _) .build() .user_data(poll_token as _); unsafe{ if sq.push(&poll_e).is_err() { backlog.push_back(poll_e); } } }
Token::Poll { fd } => { let (buf_index, buf) = match bufpool.pop() { Some(buf_index) => (buf_index, &mut buf_alloc[buf_index]), None => { let buf = vec![0u8; 2048].into_boxed_slice(); let buf_entry = buf_alloc.vacant_entry(); let buf_index = buf_entry.key(); (buf_index, buf_entry.insert(buf)) } };
*token = Token::Read { fd, buf_index };
let read_e = opcode::Recv::new(types::Fd(fd), buf.as_mut_ptr(), buf.len() as _) .build() .user_data(token_index as _);
unsafe { if sq.push(&read_e).is_err() { backlog.push_back(read_e); } } }
Token::Read { fd, buf_index} => { if ret == 0 { bufpool.push(buf_index); token_alloc.remove(token_index);
println!("shutdown");
for i in0..sockets.len() { if sockets[i] == fd { sockets.remove(i); } }
unsafe { libc::close(fd); } }else { let len = ret asusize; let buf = &buf_alloc[buf_index];
let socket_len = sockets.len(); token_alloc.remove(token_index); for i in0..socket_len { let write_token = Token::Write { fd: sockets[i], buf_index, len, offset: 0 };
let write_token_index = token_alloc.insert(write_token);
let write_e = opcode::Send::new(types::Fd(sockets[i]), buf.as_ptr(), len as _) .build() .user_data(write_token_index as _); unsafe { if sq.push(&write_e).is_err() { backlog.push_back(write_e); } } }
} }
Token::Write { fd, buf_index, offset, len } => { let write_len = ret asusize;
let entry = if offset + write_len >= len { bufpool.push(buf_index);
*token = Token::Poll { fd };
opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _) .build() .user_data(token_index as _) }else { let offset = offset + write_len; let len = len - offset; let buf = &buf_alloc[buf_index][offset..];
*token = Token::Write { fd, buf_index, offset, len };
opcode::Write::new(types::Fd(fd), buf.as_ptr(), len as _) .build() .user_data(token_index as _) };
loop { if sq.is_full() { match submitter.submit() { Ok(_) => (), Err(ref err) => if err.raw_os_error() == Some(libc::EBUSY) {break;}, Err(err) => panic!(err) } } sq.sync();
match backlog.pop_front() { Some(sqe) => unsafe { let _ = sq.push(&sqe); },
None => break, } }
accept.push_to(&mut sq);
for cqe in &mut cq { let ret = cqe.result(); let token_index = cqe.user_data() asusize;
if ret < 0 { eprintln!( "token {:?} error: {:?}", token_alloc.get(token_index), io::Error::from_raw_os_error(-ret) ); continue; }
let token = &mut token_alloc[token_index]; match token.clone() { Token::Accept => { accept.count += 1; let fd = ret; sockets.push(fd); let poll_token = token_alloc.insert(Token::Poll{ fd }); let poll_e = opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _) .build() .user_data(poll_token as _); unsafe{ if sq.push(&poll_e).is_err() { backlog.push_back(poll_e); } } }
Token::Poll { fd } => { let (buf_index, buf) = match bufpool.pop() { Some(buf_index) => (buf_index, &mut buf_alloc[buf_index]), None => { let buf = vec![0u8; 2048].into_boxed_slice(); let buf_entry = buf_alloc.vacant_entry(); let buf_index = buf_entry.key(); (buf_index, buf_entry.insert(buf)) } };
*token = Token::Read { fd, buf_index };
let read_e = opcode::Recv::new(types::Fd(fd), buf.as_mut_ptr(), buf.len() as _) .build() .user_data(token_index as _);
unsafe { if sq.push(&read_e).is_err() { backlog.push_back(read_e); } } }
Token::Read { fd, buf_index} => { if ret == 0 { bufpool.push(buf_index); token_alloc.remove(token_index);
println!("shutdown");
for i in0..sockets.len() { if sockets[i] == fd { sockets.remove(i); } }
unsafe { libc::close(fd); } }else { let len = ret asusize; let buf = &buf_alloc[buf_index];
let socket_len = sockets.len(); token_alloc.remove(token_index); for i in0..socket_len { let write_token = Token::Write { fd: sockets[i], buf_index, len, offset: 0 };
let write_token_index = token_alloc.insert(write_token);
let write_e = opcode::Send::new(types::Fd(sockets[i]), buf.as_ptr(), len as _) .build() .user_data(write_token_index as _); unsafe { if sq.push(&write_e).is_err() { backlog.push_back(write_e); } } }
} }
Token::Write { fd, buf_index, offset, len } => { let write_len = ret asusize;
let entry = if offset + write_len >= len { bufpool.push(buf_index);
*token = Token::Poll { fd };
opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _) .build() .user_data(token_index as _) }else { let offset = offset + write_len; let len = len - offset; let buf = &buf_alloc[buf_index][offset..];
*token = Token::Write { fd, buf_index, offset, len };
opcode::Write::new(types::Fd(fd), buf.as_ptr(), len as _) .build() .user_data(token_index as _) };