1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use libc::{size_t, ssize_t};
use std::{io, ptr};
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::mpsc::Sender;
mod raw {
use std::os::unix::io::RawFd;
use ::libc::{c_longlong, size_t, ssize_t, c_uint};
#[cfg(target_arch="x86_64")]
pub const SPLICE_F_NONBLOCK: c_uint = 2;
#[allow(non_camel_case_types)]
pub type loff_t = c_longlong;
extern {
pub fn splice(fd_in: RawFd, off_in: *mut loff_t, fd_out: RawFd, off_out: *mut loff_t,
len: size_t, flags: c_uint) -> ssize_t;
}
}
enum SpliceMode {
Block,
#[allow(dead_code)]
NonBlock
}
fn splice(fd_in: &RawFd, fd_out: &RawFd, len: size_t, mode: SpliceMode) -> io::Result<ssize_t> {
let flags = match mode {
SpliceMode::Block => 0,
SpliceMode::NonBlock => raw::SPLICE_F_NONBLOCK,
};
match unsafe { raw::splice(*fd_in, ptr::null_mut(), *fd_out, ptr::null_mut(), len, flags) } {
-1 => Err(io::Error::last_os_error()),
s => Ok(s),
}
}
static SPLICE_BUFFER_SIZE: size_t = 1024;
pub fn splice_loop(do_flush: Arc<AtomicBool>, flush_event: Option<Sender<()>>, fd_in: RawFd, fd_out: RawFd) {
'select: loop {
if do_flush.load(Relaxed) {
break 'select;
}
match splice(&fd_in, &fd_out, SPLICE_BUFFER_SIZE, SpliceMode::Block) {
Ok(..) => {},
Err(e) => {
match e.kind() {
io::ErrorKind::BrokenPipe => {},
_ => {
do_flush.store(true, Relaxed);
break 'select;
}
}
}
}
}
match flush_event {
Some(event) => {
let _ = event.send(());
},
None => {}
}
}