I've had a report that FIO on Windows (at least Server 2012 R2 and 2016)
hangs when the attached script is run. The point at which it hangs is
apparently random, and within the condvar (pthread-win32) calls.
I've replicated the hang, but I don't have time to debug it so I was
hoping somebody on this mailing list might have time to dig into it and
figure out what's wrong.
The people I was talking to thought it might be due to FIO linking to
msvcrt.dll, which is old and not supposed to be used by applications -
they should use the CRT distributed with Visual C++, such as
msvcr120.dll instead. However, it appears that fixing this would take
quite a lot of work since while FIO is relatively straightforward to
change, pthread-win32 hasn't had a full release since 2012 and doesn't
build under a current msys environment due to duplicated symbols etc.
--
Rebecca
#!/usr/bin/env python
#
# Storage Performance Profiler
# ----------------------------
# This framework facilitates running workloads with FIO.
# It also organises the results in easy-to-parse '.dat' files
# for later plotting with gnuplot.
#
# Author: Felipe Franciosi <felipe@xxxxxxxxxxx>
#
from ast import literal_eval
from ConfigParser import ConfigParser
from optparse import OptionParser
from os import unlink, path
from subprocess import Popen, PIPE
from sys import exit, stdout
from tempfile import NamedTemporaryFile
import datetime
# dabe
#full_filename = ''
#drivelist = [ '\\\.\PhysicalDrive1', '\\\.\PhysicalDrive2', '\\\.\PhysicalDrive3:\\\.\PhysicalDrive4', '\\\.\PhysicalDrive5:\\\.\PhysicalDrive6', '\\\.\PhysicalDrive7:\\\.\PhysicalDrive8' ]
#for filename in drivelist:
#if len(full_filename) > 1:
#full_filename = full_filename + ':' + filename
#else:
#full_filename = filename
#print full_filename
#
defaults = {
# @filename: Name of device or file to profile
# Multiple files/devices can be specified with a ':' separator
"filename": "\\\.\PhysicalDrive1",
# @size: Amount of data to read/write from the start of @filename(s)
"size": "20G",
# @minjob: Starting number of I/O threads
"minjob": "1",
# @maxjob: Maximum number of I/O threads
"maxjob": "128",
# @muljob: Increment the number of I/O threads in multiples of @muljob.
# The number of threads always start with @minjob and is never
# higher than @maxjob. Example:
# minjob=1, maxjob=10, muljob=4 generates {1, 4, 8}
# minjob=3, maxjob=12, muljob=4 generates {3, 4, 8, 12}
"muljob": "2", # Increment JOBS in multiples of (eg. 1, 4, 8)
# @mineqd: Starting effective queue depth (qd x numjob)
"mineqd": "1",
# @maxeqd: Maximum effective queue depth
"maxeqd": "128",
# @muleqd: Increment the effective queue depth in multiples of @muleqd.
# This takes into consideration @muljob. If a muleqd is not a
# multiple of numjob x qd, the profiler will round up and use
# the next closest option, respecting numjob first. Examples:
# minjob=1, maxjob=4, muljob=2, mineqd=1, maxeqd=64, muleqd=32
# {efd=1 (qd=1,nj=1), efd=32 (qd=32,nj=1), efd=64 (qd=64,nj=1)
# {efd=2 (qd=1,nj=2), efd=32 (qd=16,nj=2), efd=64 (qd=32,nj=2)
# {efd=4 (qd=1,nj=4), efd=32 (qd=8,nj=4), efd=64 (qd=16,nj=4)
# Note: "qd" is per thread.
"muleqd": "1", # Increment QD in multiples of (eg. 1, 64, 128)
# @minbsz: Minimum block size (values are always in bytes)
"minbsz": "4096",
# @maxbsz: Maxmium block size (values are always in bytes)
# Note: block size is always incremented in powers of two
"maxbsz": "1048576",
# @runtime: Runtime for each spec, always in seconds
"runtime": "20",
# @dirs: Comma-separated list of directions. Each direction must be
# specified in quotes. Valid directions:
# "read" Sequential read
# "write" Sequential write
# "randread" Random reads
# "randwrite" Random writes
# "rw" Mixed sequential reads and writes (50/50)
# "randrw" Mixed random reads and writes (50/50)
"dirs": '"randread"',
# @outdat: Filename to write plottable text data with job results
"outdat": "test.dat",
}
dirs = [
"read",
"write",
"randread",
"randwrite",
"rw", # or readwrite
"randrw"
]
class ProfilerSpec(object):
def __init__(self, conf):
assert(conf.has_key('filename'))
assert(conf.has_key('size'))
assert(conf.has_key('runtime'))
assert(conf.has_key('numjobs'))
assert(conf.has_key('iodepth'))
assert(conf.has_key('bs'))
assert(conf.has_key('dir'))
self.conf = conf
self.spec = None
self.data = {}
def createSpec(self):
fio_spec = """[global]
ioengine=windowsaio
direct=1
time_based
group_reporting
size={size}
runtime={runtime}
numjobs={numjobs}
iodepth={iodepth}
bs={bs}
rw={dir}
[job]
filename={filename}""".format(**self.conf)
try:
self.spec = NamedTemporaryFile(delete=False)
self.spec.write(fio_spec)
self.spec.flush()
except:
if self.spec:
unlink(self.spec.name)
raise
def run(self):
assert(self.spec)
cmd = ["fio", self.spec.name, "--minimal", "--terse-version=3"]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
out, err = proc.communicate()
if err:
raise Exception(err)
res = out.split(';')
self.data['read_bw'] = int(res[6])
self.data['read_iops'] = int(res[7])
self.data['write_bw'] = int(res[47])
self.data['write_iops'] = int(res[48])
def cleanup(self):
assert(self.spec)
try:
unlink(self.spec.name)
except:
pass
finally:
self.spec = None
class ProfilerJob(object):
def __init__(self, name, conf):
assert(name)
assert(conf)
self.name = name
self.conf = conf
self.specs = []
self.outdatfp = None
def append(self, spec):
self.specs.append(spec)
def run(self):
assert(len(self.specs) > 0)
print "* Running job: '%s' (%d secs / spec)" % (self.name,
int(self.conf['runtime']))
i=1
for spec in self.specs:
if i > 1:
stdout.write("\033[F")
stdout.flush()
now = datetime.datetime.now()
print "** Executing spec %d/%d at %s" % (i, len(self.specs), now)
spec.createSpec()
try:
spec.run()
except:
raise
finally:
spec.cleanup()
i = i + 1
def writeDataFile(self):
assert(self.conf['outdat'])
self.outdatfp = open(self.conf['outdat'], 'w')
self.__writeDataFile()
def __writeDataFile(self):
data = """# FIO Results for "{filename}" (size={size})
# QD : {mineqd} -> {maxeqd} in multiples of: {muleqd}
# JOBS : {minjob} -> {maxjob} in multiples of: {muljob}
# BS : {minbsz} -> {maxbsz} in powers of two
""".format(**self.conf)
for dir in dirs:
data = data + """
# %s:
# Eff.QD Jobs QD blocksz IOPS_rd IOPS_wr KB/s_rd KB/s_wr
""" % dir
specs_dir = [ x for x in self.specs if x.conf['dir'] == dir ]
atleastone = False
for spec in specs_dir:
if spec.data:
atleastone = True
break
if not atleastone:
data = data + "0\n\n"
continue
for spec in specs_dir:
if not spec.data:
continue
effqd = spec.conf['numjobs'] * spec.conf['iodepth']
jobs = spec.conf['numjobs']
qd = spec.conf['iodepth']
bs = spec.conf['bs']
iopsr = spec.data['read_iops']
iopsw = spec.data['write_iops']
kbsr = spec.data['read_bw']
kbsw = spec.data['write_bw']
data = data + "%8d %5d %4d %8d %8d %8d %9d %9d\n" % (
effqd, jobs, qd, bs, iopsr, iopsw, kbsr, kbsw)
self.outdatfp.write(data)
self.outdatfp.flush()
class ProfilerConfig(object):
def __init__(self, configfile=None):
self.configfile = configfile
self.config = self.__parseConfig()
self.jobs = self.__createJobs()
def dumpConfig(self):
assert(self.config)
for section in self.config.sections():
print
print "["+section+"]"
for option in self.config.options(section):
if option == "dirs":
print "%s: %s" % (option, self.__getDirs(self.config, section))
else:
print "%s: %s" % (option, self.config.get(section, option))
def dumpSpecs(self):
assert(self.jobs)
for job in self.jobs:
for spec in job.specs:
print "%s: %s" % (job.name, spec.conf)
def __parseConfig(self):
config = ConfigParser(defaults)
if self.configfile:
config.read(self.configfile)
else:
# Create a single 'config' section using just defaults
config.add_section("config")
self.__validate(config)
return config
def __validate(self, config):
valid_opts = set(defaults)
valid_dirs = set(dirs + ["readwrite"])
for section in config.sections():
sect_opts = set(config.options(section))
if sect_opts != valid_opts:
raise Exception("Invalid options %s for section '%s'" %
(list(sect_opts - valid_opts), section))
sect_dirs_list = self.__getDirs(config, section)
sect_dirs = set(sect_dirs_list)
if not sect_dirs.issubset(valid_dirs):
raise Exception("Invalid dirs %s for section '%s'" %
(list(sect_dirs - valid_dirs), section))
# 'rw' and 'readwrite' are equivalent in 'fio'
if set(['rw', 'readwrite']).issubset(sect_dirs):
sect_dirs_list.remove('readwrite')
sect_dirs_str = str(sect_dirs_list).translate(None, "[]")
config.set(section, "dirs", sect_dirs_str)
if config.get(section, "outdat") is None:
raise Exception("Need 'outdat' for section '%s'" % section)
# TODO: Sanity check everything else (eg. bs % 512, min < max)
def __createJobs(self):
assert(self.config)
jobs = []
for section in self.config.sections():
job = ProfilerJob(section, dict(self.config.items(section)))
self.__createSpecs(job)
jobs.append(job)
return jobs
def __createSpecs(self, job):
section = job.name
minjob = int(self.config.get(section, "minjob"))
maxjob = int(self.config.get(section, "maxjob"))
muljob = int(self.config.get(section, "muljob"))
mineqd = int(self.config.get(section, "mineqd"))
maxeqd = int(self.config.get(section, "maxeqd"))
muleqd = int(self.config.get(section, "muleqd"))
minbsz = int(self.config.get(section, "minbsz"))
maxbsz = int(self.config.get(section, "maxbsz"))
# Hack 'er up to do what I want. Yeah, this is not how I should do this.
# dabe
bszcur = minbsz
while bszcur <= maxbsz:
filename = ''
drivelist = [ '\\\.\PhysicalDrive1', '\\\.\PhysicalDrive2', '\\\.\PhysicalDrive3:\\\.\PhysicalDrive4', '\\\.\PhysicalDrive5:\\\.\PhysicalDrive6', '\\\.\PhysicalDrive7:\\\.\PhysicalDrive8' ]
for tmp_filename in drivelist:
if len(filename) > 1:
filename = filename + ':' + tmp_filename
else:
filename = tmp_filename
diskCount = 1
#print filename
#print len(filename)
#filename = self.config.get(section, 'filename')
size = self.config.get(section, 'size')
runtime = self.config.get(section, 'runtime')
for dir in self.__getDirs(self.config, section):
#curjob = minjob
#while curjob <= maxjob:
#cureqd = mineqd if mineqd == curjob else curjob*((mineqd/curjob)+1)
iodepth = 1
while iodepth <= 32:
curjob = iodepth * diskCount
#cureqd = mineqd if mineqd == curjob else curjob*((mineqd/curjob)+1)
#while cureqd <= maxeqd or cureqd == curjob:
#qdperjob = cureqd/curjob
qdperjob = 1
conf = {'filename': filename,
'size': size,
'runtime': runtime,
'dir': dir,
'numjobs': curjob,
'iodepth': qdperjob,
'bs': bszcur}
spec = ProfilerSpec(conf)
job.append(spec)
if iodepth == 1:
iodepth = iodepth + 1
else:
iodepth = iodepth + 4
#cureqd = muleqd*(1+(cureqd/muleqd))
#if cureqd % curjob:
#cureqd = curjob*((cureqd/curjob)+1)
#curjob = muljob*(1+(curjob/muljob))
if diskCount < 2:
diskCount = diskCount + 1
else:
diskCount = diskCount + 2
bszcur = bszcur*2
@staticmethod
def __getDirs(config, section):
assert(section)
# ConfigParser values don't cope with lists, so we store 'dirs' as a string
return literal_eval("["+config.get(section, "dirs")+"]")
class Profiler(object):
def __init__(self, configfile):
self.config = ProfilerConfig(opts.configfile)
# TODO: Ensure 'fio' is installed
def dumpConfig(self):
self.config.dumpConfig()
def dumpSpecs(self):
self.config.dumpSpecs()
def checkOutDat(self, overwrite):
for job in self.config.jobs:
if path.isdir(job.conf['outdat']):
raise Exception("Unable to write results to '%s': it's a directory")
if path.exists(job.conf['outdat']):
if overwrite:
print "Warning: overwriting file '%s'" % (job.conf['outdat'],)
else:
raise Exception("Refusing to overwrite file '%s': use -f" %
(job.conf['outdat'],))
def runJobs(self):
for job in self.config.jobs:
try:
job.run()
except KeyboardInterrupt:
print "\nInterrupted by keyboard, writing partial results only"
job.writeDataFile()
def main(opts):
profiler = Profiler(opts.configfile)
if opts.verbose:
profiler.dumpConfig()
profiler.dumpSpecs()
if opts.dryrun:
return
profiler.checkOutDat(opts.overwrite)
profiler.runJobs()
if __name__ == "__main__":
parser = OptionParser(usage="usage: %prog [options]")
parser.add_option("-c", "--conf", dest="configfile",
help="Profiler configuration file")
parser.add_option("-v", "--verbose", dest="verbose",
action="store_true", default=False,
help="Dump config and specs")
parser.add_option("-n", "--dryrun", dest="dryrun",
action="store_true", default=False,
help="Just parse config file, don't run profiler")
parser.add_option("-f", "--force", dest="overwrite",
action="store_true", default=False,
help="Overwrite existing outdat files")
(opts, args) = parser.parse_args()
exit(main(opts))