Hi Larry, May i know the glusterfs version under use?. Can you use stat-prefetch on the client side above write-behind?. -- Harshavardhana Gluster - http://www.gluster.com On Tue, Jan 26, 2010 at 6:41 PM, Larry Bates <larry.bates at vitalesafe.com>wrote: > Sure if you want to take a look. > > > > -Larry > > > > Two servers (gfs01 and gfs02) configured with following scripts (they > differ only by server names): > > > > volume vol1 > > type storage/posix # POSIX FS translator > > option directory /mnt/glusterfs/vol1 # Export this directory > > option background-unlink yes # unlink in background > > end-volume > > > > volume vol2 > > type storage/posix > > option directory /mnt/glusterfs/vol2 > > option background-unlink yes > > end-volume > > > > volume vol3 > > type storage/posix > > option directory /mnt/glusterfs/vol3 > > option background-unlink yes > > end-volume > > > > volume vol4 > > type storage/posix > > option directory /mnt/glusterfs/vol4 > > option background-unlink yes > > end-volume > > > > volume vol5 > > type storage/posix > > option directory /mnt/glusterfs/vol5 > > option background-unlink yes > > end-volume > > > > volume vol6 > > type storage/posix > > option directory /mnt/glusterfs/vol6 > > option background-unlink yes > > end-volume > > > > volume vol7 > > type storage/posix > > option directory /mnt/glusterfs/vol7 > > option background-unlink yes > > end-volume > > > > volume vol8 > > type storage/posix > > option directory /mnt/glusterfs/vol8 > > option background-unlink yes > > end-volume > > > > volume iot1 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol1 > > end-volume > > > > volume iot2 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol2 > > end-volume > > > > volume iot3 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol3 > > end-volume > > > > volume iot4 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol4 > > end-volume > > > > volume iot5 > > type performance/io-threads > > option thread-count 8 > > subvolumes vol5 > > end-volume > > > > volume iot6 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol6 > > end-volume > > > > volume iot7 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol7 > > end-volume > > > > volume iot8 > > type performance/io-threads > > option thread-count 4 > > subvolumes vol8 > > end-volume > > > > volume gfs01brick1 > > type features/locks > > subvolumes iot1 > > end-volume > > > > volume gfs01brick2 > > type features/locks > > subvolumes iot2 > > end-volume > > > > volume gfs01brick3 > > type features/locks > > subvolumes iot3 > > end-volume > > > > volume gfs01brick4 > > type features/locks > > subvolumes iot4 > > end-volume > > > > volume gfs01brick5 > > type features/locks > > subvolumes iot5 > > end-volume > > > > volume gfs01brick6 > > type features/locks > > subvolumes iot6 > > end-volume > > > > volume gfs01brick7 > > type features/locks > > subvolumes iot7 > > end-volume > > > > volume gfs01brick8 > > type features/locks > > subvolumes iot8 > > end-volume > > > > ## Add network serving capability to volumes > > volume server > > type protocol/server > > option transport-type tcp # For TCP/IP transport > > # > > # Expose all the bricks to the clients > > # > > subvolumes gfs01brick1 gfs01brick2 gfs01brick3 gfs01brick4 gfs01brick5 > gfs01brick6 gfs01brick7 gfs01brick8 > > option auth.addr.gfs01brick1.allow 10.0.0.* > > option auth.addr.gfs01brick2.allow 10.0.0.* > > option auth.addr.gfs01brick3.allow 10.0.0.* > > option auth.addr.gfs01brick4.allow 10.0.0.* > > option auth.addr.gfs01brick5.allow 10.0.0.* > > option auth.addr.gfs01brick6.allow 10.0.0.* > > option auth.addr.gfs01brick7.allow 10.0.0.* > > option auth.addr.gfs01brick8.allow 10.0.0.* > > end-volume > > > > Client config: > > > > # > > # Add client feature and attach to remote subvolumes of gfs01 > > # > > volume gfs01vol1 > > type protocol/client > > option transport-type tcp # for TCP/IP transport > > option remote-host gfs01 # IP/DNS address of the remote > volume > > option remote-subvolume gfs01brick1 # name of the remote volume > > option transport.socket.nodelay on # undocumented option for speed > > end-volume > > > > volume gfs01vol2 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick2 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs01vol3 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick3 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs01vol4 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick4 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs01vol5 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick5 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs01vol6 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick6 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs01vol7 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick7 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs01vol8 > > type protocol/client > > option transport-type tcp > > option remote-host gfs01 > > option remote-subvolume gfs01brick8 > > option transport.socket.nodelay on > > end-volume > > > > # > > # Add client feature and attach to remote subvolumes of gfs02 > > # > > volume gfs02vol1 > > type protocol/client > > option transport-type tcp # for TCP/IP transport > > option remote-host gfs02 # IP/DNS address of the remote > volume > > option remote-subvolume gfs02brick1 # name of the remote volume > > option transport.socket.nodelay on # undocumented option for speed > > end-volume > > > > volume gfs02vol2 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick2 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs02vol3 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick3 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs02vol4 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick4 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs02vol5 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick5 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs02vol6 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick6 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs02vol7 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick7 > > option transport.socket.nodelay on > > end-volume > > > > volume gfs02vol8 > > type protocol/client > > option transport-type tcp > > option remote-host gfs02 > > option remote-subvolume gfs02brick8 > > option transport.socket.nodelay on > > end-volume > > > > # > > # Replicate volumes > > # > > volume afr-vol1 > > type cluster/replicate > > subvolumes gfs01vol1 gfs02vol1 > > end-volume > > > > volume afr-vol2 > > type cluster/replicate > > subvolumes gfs01vol2 gfs02vol2 > > end-volume > > > > volume afr-vol3 > > type cluster/replicate > > subvolumes gfs01vol3 gfs02vol3 > > end-volume > > > > volume afr-vol4 > > type cluster/replicate > > subvolumes gfs01vol4 gfs02vol4 > > end-volume > > > > volume afr-vol5 > > type cluster/replicate > > subvolumes gfs01vol5 gfs02vol5 > > end-volume > > > > volume afr-vol6 > > type cluster/replicate > > subvolumes gfs01vol6 gfs02vol6 > > end-volume > > > > volume afr-vol7 > > type cluster/replicate > > subvolumes gfs01vol7 gfs02vol7 > > end-volume > > > > volume afr-vol8 > > type cluster/replicate > > subvolumes gfs01vol8 gfs02vol8 > > end-volume > > > > # > > # Distribute files across bricks > > # > > volume dht-vol > > type cluster/distribute > > subvolumes afr-vol1 afr-vol2 afr-vol3 afr-vol4 afr-vol5 afr-vol6 afr-vol7 > afr-vol8 > > option min-free-disk 2% # 2% of 1.8Tb volumes is 36Gb > > end-volume > > > > # > > # Writebehind performance addition > > # > > volume writebehind > > type performance/write-behind > > subvolumes dht-vol > > option flush-behind on # default value is 'off' > > option cache-size 3MB > > end-volume > > > > The script is here, but I would need to include a bunch of things that get > imported for it to run for you. > > > > import os > > import sys > > import time > > import getopt > > import signal > > import shutil > > import ConfigParser > > from ConfigParser import NoOptionError > > # > > # Get helpers > > # > > from loggerClass import loggerClass, loggerClassMixin > > import VESutils > > from VESutils import signon, getFromINI, blobpathfromblobid, > blobidfromblobpath > > from VESutils import getAccountIdFromBlobId, getMemberInfo > > from VESutils import epochtime2S3time > > from VESutils import elapsedTimeToString > > from fmt_wcommas import FMC > > from singleinstance import singleinstance > > from globalconfig import globalinifilename > > # > > # Get S3 connection class > > # > > from boto.s3.connection import S3Connection, S3ResponseError > > from boto.exception import S3ResponseError, S3CreateError, S3DataError > > > > import pyinotify > > #from pyinotify import WatchManager, Notifier, ProcessEvent > > from xattr import xattr > > # > > # Get postgreSQL DBI interface > > # > > import psycopg2 > > > > class Watcher(pyinotify.ProcessEvent, loggerClassMixin): > > ''' > > default maximum for max_user_watches is 8192 (FC5) > > set by logging in as root and entering following command: > > > > sysctl -w fs.inotify.max_user_watches=65536 > > ''' > > > > def __init__(self, conn, bucket, > > logf = None, _trace = False, _debug = 0, _dryrun = False): > > > > ''' > > MemberInfo - dictionary-like object holding cached websafe.members > info > > bucket = S3 bucket instance where new files are to be uploaded > > ''' > > self.conn = conn > > self.bucket = bucket > > self._trace = _trace > > self._debug = _debug > > self._dryrun = _dryrun > > #self.BACKUPS = '/mnt/BACKUPS/blobdata' > > self.BACKUPS = None > > # > > # Cache members_info as I go to save DB accesses > > # > > self.members_info = dict() > > # > > # If user passed loggerClass instance use it, otherwise logging > will > > # go to screen (as provided for by loggerClassMixin.LOGF). > > # > > if logf is not None: > > self.logf = logf > > > > pyinotify.ProcessEvent.__init__(self) > > self.len_eventq = 0 > > self.progress_count = 0 > > > > def process_PROGRESS(self, notifier): > > len_eventq = len(notifier._eventq) > > LOGF = self.LOGF > > LM = "PROGRESS" > > if self._trace: > > LOGF("T", LM, "Entering") > > LOGF("I", LM, "len_eventq=%s" % FMC(len_eventq)) > > > > self.progress_count += 1 > > # > > # If eventq is shorter than last time, notifier didn't call > > # .read_events() so I might need to do it here. This code needs a > > # second look because there are LONG pauses currently. > > # > > if len_eventq < self.len_eventq: > > # > > # It is too expensive to update the eventq on every file that > > # is processed, so I will do it on every 1000th file as a > > # compromise. > > # > > if (self.progress_count % 1000) == 0: > > notifier.read_events() > > > > self.len_eventq = len_eventq > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > > > def process_IN_Q_OVERFLOW(self, event): > > ''' > > process_IN_Q_OVERFLOW - this is fired when events queue overflows. > > ''' > > LM = 'process_IN_Q_OVERFLOW' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > LOGF("E", LM, "Queue overflow, set max_queued_events higher") > > LOGF("E", LM, "sudo /sbin/sysctl -w > fs.inotify.max_queued_events=#####") > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > raise OverflowError('len_eventq= %s' % FMC(self.self.len_eventq)) > > > > > > def process_IN_MOVED_TO(self, event): > > ''' > > process_IN_MOVED_TO - this is fired when upload .tmp file is moved > to > > its final resting place. > > ''' > > LM = 'process_IN_MOVED_TO' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > self._backup('M', event) > > self._upload('M', event) > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > > > def process_IN_DELETE(self, event): > > LM = 'process_IN_DELETE' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > self._delete(event) > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > > > def _delete(self, event): > > LM = '_delete' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > src = os.path.join(event.path, event.name) > > blob_id = blobidfromblobpath(src) > > if self._debug: > > LOGF("I", LM, "[%s] file=%s" % (event.name, src)) > > > > # > > # Make sure a .tmp file didn't trigger this event > > # > > if event.name.endswith('.tmp') or \ > > event.name.startswith('.'): > > > > if self._debug: > > LOGF("I", LM, ".tmp file, skipped") > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > return > > > > # > > # Also make sure that the file is a blob. blobs have 28 character > > # hex filenames. > > # > > if len(os.path.basename(event.name)) != 28: > > if self._debug: > > LOGF("I", LM, "%s non-blob file deletion skiped") > > return > > > > # > > # Make sure file hasn't "disappeared" > > # > > if not os.path.exists(src): > > LOGF("W", LM, "src=%s, disappeared, skipped" % src) > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > return > > > > # > > # Get S3path and delete the blob from S3 > > # > > S3path = blobpathfromblobid('/', blob_id) > > result = self.bucket.delete_key(S3path) > > # > > # See if I've got the information in file_data > > # > > email = None > > account_id = getAccountIdFromBlobId(self.conn, blob_id) > > if account_id is not None: > > member_info = self.members_info.get(account_id, > > getMemberInfo(self.conn, account_id = > account_id) > > ) > > > > if member_info is not None: > > email = member_info['email'] > > > > else: > > LOGF("W", LM, "blob_id=%s not found in file_data" % \ > > blob_id) > > > > LOGF("I", LM, "[D]blob_id=%s email=%s" % (blob_id, email)) > > # > > # If we are keeping local backup, delete it also > > # > > if self.BACKUPS is not None: > > dst = blobpathfromblobid(self.BACKUPS, blob_id) > > try: > > os.unlink(dst) > > > > except: # Ugly but it works! > > pass > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > def process_IN_ATTRIB(self, event): > > ''' > > process_IN_ATTRIB - this is fired when an blob file has an > attribute > > changed. Normally attributes won't change, but > > this can be used to trigger me to do an upload > > on a blob file after-the-fact and provides for > a > > 'self-healing' filesystem. > > ''' > > LM = 'process_IN_ATTRIB' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > self._backup('A', event) > > self._upload('A',event) > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > def _backup(self, parent, event): > > LM = '_backup' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > if self._debug > 1: > > LOGF("D", LM, "event.path=%s, event.name=%s" % \ > > (event.path, event.name)) > > > > if self.BACKUPS is not None: > > src = os.path.join(event.path, event.name) > > blob_id = blobidfromblobpath(src) > > if self._debug: > > LOGF("D", "[%s]" % parent, "src=%s" % src) > > > > # > > # Make sure a non-blob (.tmp/hidden) file didn't trigger this > event > > # > > if event.name.endswith('.tmp') or \ > > event.name.startswith('.') or \ > > len(os.path.basename(event.name)) != 28: > > > > if self._debug: > > LOGF("I", LM, "non-blob file, skipped") > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > return > > > > # > > # Copy the file to backup folder (iff it doesn't exist) > > # > > dst = blobpathfromblobid(self.BACKUPS, blob_id) > > if not os.path.exists(dst): > > try: > > shutil.copy2(src, dst) > > > > except: > > LOGF("E", LM, "%s->%s backup failed" % (src, dst)) > > ##raise > > > > else: > > if self._debug: > > LOGF("E", LM, "BACKUPS currently disabled, skipped") > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > > > def _upload(self, parent, event): > > LM = '_upload' > > LOGF = self.LOGF > > if self._trace: > > LOGF("T", LM, "Entering") > > > > if self._debug > 1: > > LOGF("D", LM, "event.path=%s, event.name=%s" % \ > > (event.path, event.name)) > > > > src = os.path.join(event.path, event.name) > > if self._debug: > > LOGF("D", "[%s]" % parent, "src=%s" % src) > > > > # > > # Make sure a .tmp file didn't trigger this event > > # > > if event.name.endswith('.tmp') or \ > > event.name.startswith('.'): > > > > if self._debug: > > LOGF("I", LM, ".tmp file, skipped") > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > return > > > > # > > # Also make sure that the file is a blob. blobs have 28 character > > # hex filenames. > > # > > if len(os.path.basename(event.name)) != 28: > > if self._debug: > > LOGF("I", LM, "%s non-blob file skiped") > > return > > > > # > > # Make sure file hasn't "disappeared" > > # > > if not os.path.exists(src): > > LOGF("W", LM, "src=%s, not found, skipped" % src) > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > return > > > > # > > # See if I've got the information in file_data > > # > > blob_id = blobidfromblobpath(src) > > email = None > > account_id = getAccountIdFromBlobId(self.conn, blob_id) > > if account_id is not None: > > member_info = self.members_info.get(account_id, > > getMemberInfo(self.conn, account_id = > account_id) > > ) > > > > if member_info is not None: > > email = member_info['email'] > > > > > > else: > > LOGF("W", LM, "blob_id=%s not found in file_data" % \ > > blob_id) > > > > S3path = blobpathfromblobid('/', blob_id) > > size = os.path.getsize(src) > > # > > # Create a new key instance for S3 and pass in the > > # meta-information > > # > > k = self.bucket.new_key(key_name = S3path) > > # > > # Meta-data needed to restore a file from S3 to local filesystem > > # (e.g. to set ctime, mtime properly to work with rsync) > > # > > ctime = os.path.getctime(src) > > mtime = os.path.getmtime(src) > > VESctime = epochtime2S3time(ctime) > > VESmtime = epochtime2S3time(mtime) > > if self._debug > 1: > > LOGF("D", LM, "setting VESctime=%s" % VESctime) > > LOGF("D", LM, "setting VESmtime=%s" % VESmtime) > > > > k.set_metadata('ctime', VESctime) > > k.set_metadata('mtime', VESmtime) > > age = time.time() - ctime > > LOGF("I", LM, "[%s]%-28s %s(%11s)[%s]" % \ > > (parent, > > email[:28], > > blob_id, > > FMC(size), > > elapsedTimeToString(age) > > ) > > ) > > > > if not self._dryrun: > > # > > # Upload the file to S3. Use replace=False to short circuit > > # the upload if the file already exists. That way I'm not > using > > # upload bandwidth unnecessarily. > > # > > k.set_contents_from_filename(src, replace = False) > > if self._trace: > > LOGF("I", LM, "done") > > > > if self._trace: > > LOGF("T", LM, "Leaving") > > > > > > def sigTERM(signal, frame): > > global stopnow > > stopnow = True > > > > def notifyCallback(notifier): > > global _trace, LOGF > > global stopnow > > LM = 'notifyCallback' > > if stopnow: > > notifier.stop() > > > > > > def conlog(severity, LM, msg): > > global _trace, _debug, _quiet, LOGF > > if msg: > > LOGF(severity, "%s-%s" % (LM,msg)) > > > > if not _quiet: > > sys.stdout.write(msg + '\n') > > sys.stdout.flush() > > > > pgm_ = 'monitorblobs' > > ver_ = '1.1.1.0' > > LM = 'main' > > # > > # V1.0.0.1 13 JUL 08 LAB Wrapped call to S3 to get HEAD after uploading (to > > # access last_modified meta-data) in while-try block > > # to work around boto issue #125) > > # > > # V1.0.0.2 13 JUL 08 LAB Added .strip() to arguments to work with > > # supervisord > > # > > # V1.0.0.3 14 JUL 08 LAB Added start program separator to logfile, fixed > > # typo (Xattrs instead of xattrs) > > # > > # V1.0.0.4 14 JUL 08 LAB Wrapped all boto calls inside _mexe framework > > # as a workaround to the bug #125. > > # > > # V1.0.0.5 15 JUL 08 LAB Removed the IN_MOVED_TO handler because every > upload > > # triggered the IN_ATTRIB handler as well. > > # > > # V1.1.0.0 17 JUL 08 LAB Upgrded to V0.8 of pyinotify, replaced hard-coded > > # checking of the inotify variables with newly > > # provided SysCtlNotify class. > > # > > # V1.1.0.1 26 JUL 08 LAB Added hack to log len(_eventq) for debugging > > # > > # V1.1.0.2 29 JUL 08 LAB Added dbTableCache so I can display member info > > # in logfile as files are processed (unfinished > until > > # fileid xattr is set from upload processing code). > > > > # V1.1.0.3 07 MAR 09 LAB Introduced getFromINI and _<runtimeparm> naming, > > # removed _mexe from class because it is now inside > boto > > # > > # V1.1.0.4 12 MAR 09 LAB Shortened some logging messages, put others under > > # _trace control > > # > > # V1.1.0.5 30 MAR 09 LAB Added code to _upload that skips files that start > > # with a period (.). These files are generated when > > # doing an rsync recovery on a monitored branch. > > # > > # V1.1.0.6 11 DEC 09 LAB Introduced the VES_STORAGE_S3 environment variable > > # that points to bucket_name > > # > > # V1.1.0.7 27 DEC 09 LAB Added copy to backup method to Watcher class (this > > # is temporary because I'm moving to GlusterFS auto > > # replication). > > # > > # V1.1.0.8 04 JAN 10 LAB Cleaned up upload code, eliminated xattr handlers > > # > > # V1.1.0.9 10 JAN 10 LAB Better logging for _delete, _upload. Cleaned up > > # code for making/skipping backups (which won't be > > # needed when I switch to GlusterFS). > > # > > # V1.1.1.0 23 JAN 10 LAB Introduced globalconfig, removed BACKUPS because > > # of cutover to GlusterFS > > # > > # Register the signal handler > > # > > signal.signal(signal.SIGTERM, sigTERM) > > stopnow = False > > > > PGM_usage=''' > > Usage: %s [OPTIONS] > > Monitor blobdata folders for changes and process files > > > > -t, --trace run in trace mode > > -d, --debug= run at debug level= (e.g 1,2,3) > > -q, --quiet run in quiet mode (minimal output) > > -D, --dryrun dry run, no changes > > -l, --logfilename= specify the path/filename to .LOG file > > -i, --inifilename= specify the path/filename to .INI file > > -h, --help help (this screen) > > -V, --version output version information and stop > > ''' % pgm_ > > > > # > > # Get the options that the user may have passed on processor call line > > # > > short_args = "td:ql:i:hVD" > > long_args = ["trace", "debug=", "quiet", "logfilename=", > > "inifilename=", "help", "version", "dryrun", > > ] > > > > # > > # Process command line options > > # > > opts, args = getopt.gnu_getopt(sys.argv[1:], short_args, long_args) > > # > > # Set inital values for potential argv parameters > > # > > _trace = False > > _debug = 0 > > _quiet = False > > _dryrun = False > > _logfilename = None > > _inifilename = None > > # > > # Parse command line options > > # > > for option, value in opts: > > if option in ("-h", "--help"): > > sys.exit(PGM_usage) > > > > if option in ("-V", "--version"): > > print '%s Version %s' % (pgm_, ver_) > > sys.exit(0) > > > > if option in ("-t", "--trace"): _trace = True > > elif option in ("-d", "--debug"): _debug = int(value) > > elif option in ("-q", "--quiet"): _quiet = 1 > > elif option in ("-D", "--dryrun"): _dryrun = True > > elif option in ("-l", "--logfilename"): _logfilename = value.strip() > > elif option in ("-i", "--inifilename"): _inifilename = value.strip() > > > > # > > # If user didn't specify inifilename on processor call, use default > > # > > if _inifilename is None: > > _inifilename = "%s.ini" % pgm_ > > > > if not os.path.exists(globalinifilename): > > msg = "inifilename=%s, not found, aborting" % globalinifilename > > raise RuntimeError(msg) > > > > if not os.path.exists(_inifilename): > > msg = "inifilename=%s, not found, aborting" % _inifilename > > #print "%s-%s.%s" % ("E", LM, msg) > > raise RuntimeError(msg) > > > > # > > # Create ConfigParser instance to read .INI information > > # > > INI = ConfigParser.ConfigParser() > > # > > # Read .INI file > > # > > INI.read([globalinifilename, _inifilename]) > > > > _logfilename = getFromINI(INI.get, 'init', 'logfilename', > > _logfilename, "%s.log" % pgm_) > > > > logf = loggerClass(_logfilename, 'monitor', 1<<26) #64Mb max > > logf.initsessionlog() > > LOGF = logf.writelines > > LOGF("I", ("------%s V%s begin" % (pgm_, ver_)).ljust(50, '-')) > > # > > # Make sure there isn't another copy of me running already > > # > > _pidPath = getFromINI(INI.get, 'init', 'pidPath', None, None) > > myapp = singleinstance(pgm_, _pidPath) > > if myapp.alreadyrunning(): > > msg = "%s already running, exiting" % pgm_ > > raise RuntimeError(msg) > > > > > > _trace = getFromINI(INI.getboolean, 'init', 'trace', _trace, False) > > _debug = getFromINI(INI.getint, 'init', 'debug', _debug, 0) > > _quiet = getFromINI(INI.getboolean, 'init', 'quiet', _quiet, False) > > _dryrun = getFromINI(INI.getboolean, 'init', 'dryrun', _dryrun, False) > > > > signon(pgm_, ver_, _quiet=_quiet) > > # > > # More items to get from the .INI file (or environment) > > # > > _STORAGE = getFromINI(INI.get, 'init', 'storage', None, None) > > _bucketname = getFromINI(INI.get, 'init', 'bucketname', None, None) > > > > # > > # Get files from .INI to read AWS credentials from > > # > > _accessKeyFile = getFromINI(INI.get, 'init', 'accesskeyfile', None, None) > > _secretKeyFile = getFromINI(INI.get, 'init', 'secretkeyfile', None, None) > > > > if _debug: > > conlog("I", LM, "-----Options".ljust(50, '-')) > > conlog("I", LM, "trace..........%s" % _trace) > > conlog("I", LM, "debug..........%i" % _debug) > > conlog("I", LM, "quiet..........%s" % _quiet) > > conlog("I", LM, "dryrun.........%s" % _dryrun) > > conlog("I", LM, "STORAGE........%s" % _STORAGE) > > conlog("I", LM, "pidPath........%s" % _pidPath) > > conlog("I", LM, "bucketname.....%s" % _bucketname) > > conlog("I", LM, "accessKeyFile..%s" % _accessKeyFile) > > conlog("I", LM, "secretKeyFile..%s" % _secretKeyFile) > > > > _PWMfile = getFromINI(INI.get, 'init', 'pwmfile', None, None) > > _host = getFromINI(INI.get, 'database', 'host', None, None) > > _database = getFromINI(INI.get, 'database', 'database', None, None) > > _dbport = getFromINI(INI.getint, 'database', 'port', None, None) > > _user = getFromINI(INI.get, 'database', 'user', None, None) > > > > conlog("I", LM, "PWMfile........%s" % _PWMfile) > > conlog("I", LM, "host...........%s" % _host) > > conlog("I", LM, "database.......%s" % _database) > > conlog("I", LM, "dbport.........%i" % _dbport) > > conlog("I", LM, "user...........%s" % _user) > > > > if not _quiet: > > print > > > > # > > # Get database password from file > > # > > _password = open(_PWMfile, 'r').readline().rstrip() > > conn = psycopg2.connect(host=_host, database=_database, port=_dbport, > > user=_user, password=_password) > > # > > # Get the AccessKey and SecretAccessKey info > > # > > aws_ak = open(_accessKeyFile,'r').readline().rstrip() > > aws_sak = open(_secretKeyFile,'r').readline().rstrip() > > # > > # Create an instance of boto S3Connection using these credentials > > # > > S3obj = S3Connection(aws_ak, aws_sak) > > if _trace: > > conlog("T", LM, "S3 connection object created") > > conlog("T", LM, "Retrieving bucketname=%s from S3" % _bucketname) > > > > bucket = S3obj.get_bucket(_bucketname) > > if _trace: > > conlog("T", LM, "bucketname = %s, retrieved" % _bucketname) > > > > # > > # Watch for move/delete events > > # > > #mask = pyinotify.IN_DELETE | pyinotify.IN_ATTRIB | pyinotify.IN_Q_OVERFLOW > > mask = pyinotify.IN_ATTRIB > > mask |= pyinotify.IN_Q_OVERFLOW > > mask |= pyinotify.IN_MOVED_TO > > # > > # Create instance of WatchManager class and notifier class > > # > > wm = pyinotify.WatchManager() > > if _trace: > > conlog("T", LM, "Creating Watcher instance") > > > > Wobj = Watcher(conn, bucket, logf = logf, > > _trace = _trace, _debug = _debug, _dryrun = _dryrun) > > > > if _trace: > > conlog("T", LM, "Watcher instance created") > > conlog("T", LM, "Creating Notifier instance") > > > > notifier = pyinotify.Notifier(wm, Wobj) > > > > if _trace: > > conlog("T", LM, "Notifier instance created") > > > > # > > # If I'm debugging, get a loggerClass instance into notifier class instance > > # to log _eventq depth. > > # > > if _debug: > > notifier.LOGF = logf.writelines > > > > if not _quiet: > > print > > # > > # Folders to watch (this way I won't watch any folders except the ones > > # that actually hold blobs 00-ff->00-ff). This keeps me from watching > > # temp/junk folders that might accidentally get created. > > # > > ##flist = ['%02x' % i for i in xrange(0, 256)] #00-ff > > flist = ['%02x' % i for i in xrange(int('00',16), int('ff',16) + 1)] > > conlog("I", LM, "Watchlist STORAGE..%s (recursive)" % _STORAGE) > > conlog("I", LM, "Registering folders to watch...") > > foldersRegistered = 0 > > for n, i in enumerate(flist): > > l = n + 1 > > if (n % 16) == 0: > > if not _quiet: > > if n != 0: > > sys.stdout.write('(recursive)\n') > > > > sys.stdout.write("Watchlist adding....%s " % i) > > sys.stdout.flush() > > > > else: > > if not _quiet: > > sys.stdout.write("%s " % i) > > sys.stdout.flush() > > > > for j in flist: > > watch_folder = os.path.join(_STORAGE, i, j) > > wm.add_watch(watch_folder, mask) > > foldersRegistered +=1 > > > > if not _quiet: > > sys.stdout.write('(recursive)\n') > > print > > > > conlog("I", LM, "%s folder monitors registered" % FMC(foldersRegistered)) > > if _trace: > > conlog("T", LM, "Entering notifier.loop") > > > > try: > > notifier.loop(callback = notifyCallback) > > > > except KeyboardInterrupt: > > print "KeyboardInterrupt, stopping..." > > stopnow = True > > # > > # destroy the inotify's instance on this interrupt (stop monitoring) > > # > > notifier.stop() > > > > del myapp > > if _dryrun: > > conlog("I", LM, "WARNING-dryrun = True, nothing committed") > > > > > > Hope this helps. > > > > -Larry > > > > > > *From:* harshavardhanacool at gmail.com [mailto:harshavardhanacool at gmail.com] > *On Behalf Of *Harshavardhana > *Sent:* Tuesday, January 26, 2010 2:01 AM > *To:* Larry Bates > *Cc:* gluster-users at gluster.org > *Subject:* Re: GlusterFS and inotify (more info) > > > > Hi Larry, > > Can you share with us the volume files you are using with GlusterFS?. > Also the scripts you are trying to run. > > Thanks > -- > Harshavardhana > Gluster - http://www.gluster.com > > On Tue, Jan 26, 2010 at 3:31 AM, Larry Bates <larry.bates at vitalesafe.com> > wrote: > > Well it seems that I can register, but the registration of the subfolders > is so > slow that I thought it was not working. I have subfolders 00-ff and > subfolders > under them 00-ff (64K total folder structure). Registering on normal > storage > took about 30 seconds. Registering inotify watcher (recursive=True) on > GlusterFS mount takes over 1 hr, 15 min! Walking the tree and registering > them > each individually takes 6 minutes. > > -Larry > > -----Original Message----- > From: Larry Bates [mailto:larry.bates at vitalesafe.com] > Sent: Monday, January 25, 2010 1:51 PM > To: 'gluster-users at gluster.org' > Subject: GlusterFS and inotify > > I recently moved my backend storage to GlusterFS V3.0 and with one > exception > everything is running great. That exception is that I had a daemon using > inotify that was watching my storage for new files. Upon arrival this > watcher > uploaded a copy of the file to Amazon S3. This daemon had been running > just > fine for well over a year. Moving to GlusterFS seems to indicate that > inotify > doesn't work on GlusterFS volumes. I don't know if it is a Gluster, Fuse, > or > some other problem. inotify just refuses to allow me to register the top > level > folder to be watched. Before I spend a lot of time on this, I thought I'd > bounce it off of the "experts" on this list. > > Anyone have any ideas? > > Thanks in advance, > Larry Bates > vitalEsafe, Inc. > > _______________________________________________ > Gluster-users mailing list > Gluster-users at gluster.org > http://gluster.org/cgi-bin/mailman/listinfo/gluster-users > > >