I've updated the simulation and it turns out it drops just as often as before, despite operating at 60fps rather than ~150fps. I get an ocational dropping of 8 frames which means 400+ms delay in the write calls. Then things are just fine for a while, until I miss 8 again. I multithreaded the stream write calls since that's what I do currently in the real program but I disabled openmp and the numbers are all the same with or without. iotop reports 330-350mB/s of write activity and htop reports 70-92% cpu usage.
[10:30:33.386036000] [6] min: 11ms max: 472ms avg: 14.1086799ms std: 8.1730857ms count = 6648 dropped: 16 transferred: 36.65G totaltime: 333166ms
I've attached my benchmark program but I use alot of boost c++ with a little internal set of libraries... so you can see what I"m doing but it likely won't compile for you. I'll also mention that boost is a very low overhead (if any) over all the normal system calls one would use (verified by reading sourcecode in use).
So I'll end on the same question: any ideas on how to squash out that occational 400+ms latency? Dropping down cpu usage? Should I just record to the raid directly without a filesystem? I guess that could be an interesting test to run tomorrow...
-Jason
#include <iostream> #include <vector> #include <memory> #include <iomanip> #include <exception> #include <functional> #include <algorithm> #include <cmath> #define BOOST_FILESYSTEM_VERSION 3 #define BOOST_ASIO_DISABLE_EPOLL//linux sucks with poll not implemented on filesystems #include <boost/algorithm/string.hpp> #include <boost/array.hpp> #include <boost/assert.hpp> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time_io.hpp> #include <boost/date_time/gregorian/gregorian.hpp> #include <boost/asio.hpp> #include <boost/chrono.hpp> #include <boost/filesystem.hpp> #include <boost/program_options.hpp> #include <boost/regex.hpp> #include <boost/thread.hpp> #include <errno.h> #include <poll.h> #include <unistd.h> #include <sys/mman.h> #include <sys/timerfd.h> #include <xfs/xfs.h> #include "apps/common.h" #include "common/log.h" #include "common/sched.h" #include "vision/types.h" namespace bfs = boost::filesystem; namespace asio = boost::asio; namespace bpt = boost::posix_time; namespace bch = boost::chrono; namespace po = boost::program_options; int sigpipes[2]; void sighandler(int signum, siginfo_t *info, void *p){ printf("signal: %s\n", strsignal(signum)); write(sigpipes[1], &signum, sizeof(signum)); } enum cmd_t{ REQ_EXIT }; const mode_t filemode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; int open(const char* path, int flags, mode_t mode, boost::system::error_code& ec) { errno = 0; int result = boost::asio::detail::descriptor_ops::error_wrapper(::open(path, flags, mode), ec); if (result >= 0) ec = boost::system::error_code(); return result; } struct frame_header_t{ uint32_t width; uint32_t height; uint32_t framenumber; uint8_t channels; uint8_t bpp; uint32_t bytes; } __attribute__((packed)); int main(int argc, char *argv[]){ int verbosity; bool async_logging; std::string fname; int method; po::options_description clspec("Options"); clspec.add_options() ("help,h", "see available options") ("file", po::value(&fname), "destination file") ("verbosity,v", po::value(&verbosity)->default_value((int)logging::info), boost::str(boost::format("log level: %d - %d (everything)")%(int)logging::emergency%(int)logging::debug).c_str()) ("async-logging", po::value(&async_logging)->default_value(true)->implicit_value(true), "asynchronous logging") ("method,m", po::value(&method)->default_value(0), "method, 0 for split, 1 for vectorized") ; po::variables_map vmap; po::store(po::parse_command_line(argc, argv, clspec), vmap); if(vmap.count("help")){ std::cout<<clspec<<std::endl; return 0; } po::notify(vmap); init_logging_stdout(async_logging); configure_logger((logging::severity_level) verbosity); { sigset_t mask; sigemptyset(&mask); sigfillset(&mask); sigdelset(&mask, SIGALRM); //sigaddset(&mask, SIGINT); //sigaddset(&mask, SIGQUIT); sigprocmask(SIG_BLOCK, &mask, NULL);//block signals } { int status = pipe(sigpipes); if(status != 0){ lerror()<<"Pipe returned "<<status<<": "<<strerror(errno); } } mlockall(MCL_CURRENT|MCL_FUTURE); set_sched(SCHED_RR, 30); auto start = bch::steady_clock::now(); struct sigaction act = {}; act.sa_sigaction = sighandler; act.sa_flags = SA_SIGINFO; sigaction(SIGINT, &act, NULL); sigaction(SIGQUIT, &act, NULL); sigaction(SIGTERM, &act, NULL); const int nstreams = 3; asio::io_service ioservice; boost::system::error_code ec; std::vector<boost::shared_ptr<asio::posix::stream_descriptor> > streams; dioattr dioinfo; for(int i = 0; i < nstreams; ++i){ std::string fname_l = boost::str(boost::format("%s_%d")%fname%i); boost::shared_ptr<asio::posix::stream_descriptor> f = boost::make_shared<asio::posix::stream_descriptor>(ioservice, open(fname_l.c_str(), O_CREAT|O_TRUNC|O_WRONLY, filemode, ec)); //d_mem, d_miniosz, d_maxiosz if(xfsctl(fname_l.c_str(), f->native_handle(), XFS_IOC_DIOINFO, &dioinfo) < 0){ lerror()<<"error getting DIOINFO: "<<strerror(errno); } struct fsxattr fattr = {}; if(xfsctl(fname_l.c_str(), f->native_handle(), XFS_IOC_FSGETXATTR, &fattr) < 0){ lerror()<<"error getting file attrs: "<<strerror(errno); } fattr.fsx_xflags |= XFS_XFLAG_REALTIME; if(xfsctl(fname_l.c_str(), f->native_handle(), XFS_IOC_FSSETXATTR, &fattr) < 0){ lerror()<<"error setting file attrs: "<<strerror(errno); } linfo()<<boost::format("xfs info: min: %d max: %d alignment: %d ")%dioinfo.d_miniosz%dioinfo.d_maxiosz%dioinfo.d_mem; streams.push_back(f); } typedef uint16_t PixelT; const int alignment = dioinfo.d_mem; const int blocksize = dioinfo.d_miniosz; int height = 1445; int width = 2048; int linebytes = width * sizeof(PixelT); size_t total_bytes = sizeof(frame_header_t) + height * linebytes; linfo()<<"minimum frame bytes: "<<total_bytes; total_bytes = (total_bytes + blocksize - 1) / blocksize * blocksize; if(total_bytes > dioinfo.d_maxiosz){ lerror()<<"larger than d_maxiosz"; } uint8_t *frame_data_mem = aligned_malloc<uint8_t>(total_bytes, alignment); linfo()<<"aligned malloc frame ptr: "<<frame_data_mem<<" length: "<<total_bytes; Image<PixelT> frame(frame_data_mem + sizeof(frame_header_t), height, width); for(int r = 0; r < frame.height; ++r){ for(int c = 0; c < frame.width; ++c){ frame(r,c) = r + c; } } linfo()<<"Beginning benchmark"; int timer_fd = timerfd_create(CLOCK_MONOTONIC, 0); { const timespec timeout = { 0, 1 }; const timespec period = {0 , long(50L * 1e6)};//50ms period itimerspec tspec = {period, timeout}; timerfd_settime(timer_fd, 0, &tspec, NULL); } bch::steady_clock::time_point bstart = bch::steady_clock::now(); std::vector<int64_t> dts; int breaks = 0; int missed_deadlines_total = 0; bool doexit = false; for(unsigned i = 0; ;++i){ for(;;) { pollfd poll_fd[2] = {}; poll_fd[0].fd = sigpipes[0]; poll_fd[0].events = POLLIN; poll_fd[1].fd = timer_fd; poll_fd[1].events = POLLIN; sigset_t signal_set; sigemptyset(&signal_set); //sigaddset(&signal_set, SIGINT); //sigaddset(&signal_set, SIGQUIT); int status = ppoll(&poll_fd[0], 2, NULL, &signal_set); if(status > 0){ if(poll_fd[0].revents & POLLIN){ int sig; int ret = read(sigpipes[0], &sig, sizeof(sig)); //linfo()<<"signal sigpipe path"; if(ret == sizeof(sig)){ if(sig == SIGINT || sig == SIGQUIT || sig == SIGTERM){ doexit = true; break; } }else{ linfo()<<"sigpipe received signal ("<<sig<<") "<<strsignal(sig); } } if(poll_fd[1].revents & POLLIN){ uint64_t expirations; int status = read(timer_fd, &expirations, sizeof(expirations)); if(status == sizeof(expirations) && expirations > 0){ if(expirations > 1){ lwarning()<<boost::format("Missed %d deadlines")%(expirations-1); } missed_deadlines_total += expirations - 1; break; } } } } if(doexit){ break; } bch::steady_clock::time_point start = bch::steady_clock::now(); #pragma omp parallel for default(shared) for(int s = 0; s < nstreams; ++s) //direct io version { auto f = streams[s]; frame_header_t header = { frame.width, frame.height, i, 1, 10, (uint32_t)(total_bytes - sizeof(frame_header_t)) }; *((frame_header_t *) frame_data_mem) = header; asio::write(*f, asio::buffer(frame_data_mem, total_bytes), ec); posix_fadvise(f->native_handle(), 0, lseek(f->native_handle(), 0, SEEK_CUR), POSIX_FADV_DONTNEED); if(ec){ ldebug()<<boost::format("Failure writing %s")%ec.message(); f->close(ec); if(ec){ lerror()<<boost::format("Error closing framestream file: %s")%ec.message(); } #pragma omp critical { breaks++; } } } if(breaks){ break; } bch::steady_clock::time_point end = bch::steady_clock::now(); int64_t dt = bch::duration_cast<bch::microseconds>(end - start).count(); dts.push_back(dt); } bch::steady_clock::time_point bend = bch::steady_clock::now(); int64_t min = std::numeric_limits<int64_t>::max(); int64_t max = std::numeric_limits<int64_t>::min(); double avg = 0, var = 0; for(auto it = dts.begin(); it != dts.end(); ++it){ min = std::min(min, *it); max = std::max(max, *it); avg += *it; } avg /= dts.size(); for(auto it = dts.begin(); it != dts.end(); ++it){ var += (*it - avg) * (*it - avg); } var /= dts.size(); int64_t tt = bch::duration_cast<bch::microseconds>(bend - bstart).count(); linfo()<<boost::format("min: %dms max: %dms avg: %.7fms std: %.7fms count = %d dropped: %d transferred: %.2fG totaltime: %dms")%(min/1000)%(max/1000)%(avg/1000)%(std::sqrt(var)/1000)%dts.size()%missed_deadlines_total%(frame.width*frame.height*sizeof(PixelT)*dts.size()/(1024*1024*1024.0))%(int(tt/1e3)); return 0; }
_______________________________________________ xfs mailing list xfs@xxxxxxxxxxx http://oss.sgi.com/mailman/listinfo/xfs