Sounds good to me. If it breaks anything, we can always just reinstall the mirrormanager2-mirrorlist package to return to the current version. With kind regards, Patrick Uiterwijk Fedora Infra ----- Original Message ----- > As there are more reports of timeouts connected to the mirrorlist > servers I had a closer look at the code. It seems we are hitting the > default value of max_children = 40. This freeze break request tries to > increase this to 80 via a hotfix. See commit message below for further > details. > > Adrian > > > commit 2b8e284e687d097c2cad6cd24dc3db3aa23f837d > Author: Adrian Reber <adrian@xxxxxxxx> > Date: Thu Oct 15 14:03:02 2015 +0000 > > Increase the number of possible child processes > > The mirrorlist-server is the process which has the mirrorlist data > loaded and which is accessed by the public facing > mirrorlist_client.wsgi. The mirrorlist-server uses the > ForkingUnixStreamServer which has a default of max_children = 40. > (https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py#l516) > > Looking at the code of ForkingUnixStreamServer it says at > > https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py#l523 > > # If we're above the max number of children, wait and reap them until > # we go back below threshold. Note that we use waitpid(-1) below to be > # able to collect children in size(<defunct children>) syscalls instead > # of size(<children>): the downside is that this might reap children > # which we didn't spawn, which is why we only resort to this when we're > # above max_children. > > As we are running the wsgi with processes=45 this sounds like it can > lead to situation where it might just hang. > > This increases max_children to 80 and maybe processes could be upped to > 60 or 70. > > Signed-off-by: Adrian Reber <adrian@xxxxxxxx> > > diff --git a/files/hotfix/mirrorlist/mirrorlist_server.py > b/files/hotfix/mirrorlist/mirrorlist_server.py > index 2d98fa0..2b81912 100644 > --- a/files/hotfix/mirrorlist/mirrorlist_server.py > +++ b/files/hotfix/mirrorlist/mirrorlist_server.py > @@ -938,6 +938,7 @@ def sigterm_handler(signum, frame): > > class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): > request_queue_size = 300 > + max_children = 80 > def finish_request(self, request, client_address): > signal.signal(signal.SIGHUP, signal.SIG_IGN) > BaseServer.finish_request(self, request, client_address) > diff --git a/roles/mirrormanager/mirrorlist2/tasks/main.yml > b/roles/mirrormanager/mirrorlist2/tasks/main.yml > index 786432f..2e5f5ee 100644 > --- a/roles/mirrormanager/mirrorlist2/tasks/main.yml > +++ b/roles/mirrormanager/mirrorlist2/tasks/main.yml > @@ -73,3 +73,12 @@ > # tags: > # - mirrorlist2 > # - selinux > + > + > +- name: HOTFIX - increase the number of possible child processes > + copy: src="{{ files }}/hotfix/mirrorlist/mirrorlist_server.py" > dest=/usr/share/mirrormanager2/mirrorlist_server.py > + owner=root group=root mode=0755 > + tags: > + - hotfix > + notify: > + - restart mirrorlist-server > > commit d04588274a6e161a36d8dbd9c91ea0fe78de017e > Author: Adrian Reber <adrian@xxxxxxxx> > Date: Thu Oct 15 13:54:53 2015 +0000 > > Original mirrorlist_server.py which needs to be hotfixed > > Signed-off-by: Adrian Reber <adrian@xxxxxxxx> > > diff --git a/files/hotfix/mirrorlist/mirrorlist_server.py > b/files/hotfix/mirrorlist/mirrorlist_server.py > new file mode 100644 > index 0000000..2d98fa0 > --- /dev/null > +++ b/files/hotfix/mirrorlist/mirrorlist_server.py > @@ -0,0 +1,1136 @@ > +#!/usr/bin/python > +# > +# Copyright (c) 2007-2013 Dell, Inc. > +# by Matt Domsch <Matt_Domsch@xxxxxxxx> > +# Licensed under the MIT/X11 license > + > +# standard library modules in alphabetical order > +from collections import defaultdict > +import datetime > +import getopt > +import logging > +import logging.handlers > +import os > +import random > +import cPickle as pickle > +import select > +import signal > +import socket > +from SocketServer import (StreamRequestHandler, ForkingMixIn, > + UnixStreamServer, BaseServer) > +import sys > +from string import zfill, atoi > +import time > +import traceback > + > +try: > + import threading > +except ImportError: > + import dummy_threading as threading > + > +# not-so-standard library modules that this program needs > +from IPy import IP > +import GeoIP > +import radix > +from weighted_shuffle import weighted_shuffle > + > +# can be overridden on the command line > +pidfile = '/var/run/mirrormanager/mirrorlist_server.pid' > +socketfile = '/var/run/mirrormanager/mirrorlist_server.sock' > +cachefile = '/var/lib/mirrormanager/mirrorlist_cache.pkl' > +internet2_netblocks_file = '/var/lib/mirrormanager/i2_netblocks.txt' > +global_netblocks_file = '/var/lib/mirrormanager/global_netblocks.txt' > +logfile = None > +debug = False > +must_die = False > +# at a point in time when we're no longer serving content for versions > +# that don't use yum prioritymethod=fallback > +# (e.g. after Fedora 7 is past end-of-life) > +# then we can set this value to True > +# this only affects results requested using path=... > +# for dirs which aren't repositories (such as iso/) > +# because we don't know the Version associated with that dir here. > +default_ordered_mirrorlist = False > + > +gipv4 = None > +gipv6 = None > + > +# key is strings in tuple (repo.prefix, arch) > +mirrorlist_cache = {} > + > +# key is directory.name, returns keys for mirrorlist_cache > +directory_name_to_mirrorlist = {} > + > +# key is an IPy.IP structure, value is list of host ids > +host_netblock_cache = {} > + > +# key is hostid, value is list of countries to allow > +host_country_allowed_cache = {} > + > +repo_arch_to_directoryname = {} > + > +# redirect from a repo with one name to a repo with another > +repo_redirect = {} > +country_continent_redirect_cache = {} > + > +# our own private copy of country_continents to be edited > +country_continents = GeoIP.country_continents > + > +disabled_repositories = {} > +host_bandwidth_cache = {} > +host_country_cache = {} > +host_max_connections_cache = {} > +file_details_cache = {} > +hcurl_cache = {} > +asn_host_cache = {} > +internet2_tree = radix.Radix() > +global_tree = radix.Radix() > +host_netblocks_tree = radix.Radix() > +netblock_country_tree = radix.Radix() > +location_cache = {} > +netblock_country_cache = {} > + > +## Set up our syslog data. > +syslogger = logging.getLogger('mirrormanager') > +syslogger.setLevel(logging.INFO) > +handler = logging.handlers.SysLogHandler( > + address='/dev/log', > + facility=logging.handlers.SysLogHandler.LOG_LOCAL4) > +syslogger.addHandler(handler) > + > + > +def lookup_ip_asn(tree, ip): > + """ @t is a radix tree > + @ip is an IPy.IP object which may be contained in an entry in l > + """ > + node = tree.search_best(ip.strNormal()) > + if node is None: > + return None > + return node.data['asn'] > + > + > +def uniqueify(seq, idfun=None): > + # order preserving > + if idfun is None: > + def idfun(x): return x > + seen = {} > + result = [] > + for item in seq: > + marker = idfun(item) > + # in old Python versions: > + # if seen.has_key(marker) > + # but in new ones: > + if marker in seen: continue > + seen[marker] = 1 > + result.append(item) > + return result > + > + > +##### Metalink Support ##### > + > +def metalink_header(): > + # fixme add alternate format pubdate when specified > + pubdate = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S > GMT") > + doc = '' > + doc += '<?xml version="1.0" encoding="utf-8"?>\n' > + doc += '<metalink version="3.0" xmlns="http://www.metalinker.org/"' > + doc += ' type="dynamic"' > + doc += ' pubdate="%s"' % pubdate > + doc += ' generator="mirrormanager"' > + doc += ' xmlns:mm0="http://fedorahosted.org/mirrormanager"' > + doc += '>\n' > + return doc > + > + > +def metalink_failuredoc(message=None): > + doc = metalink_header() > + if message is not None: > + doc += '<!--\n' > + doc += message + '\n' > + doc += '-->\n' > + doc += '</metalink>\n' > + return doc > + > + > +def metalink_file_not_found(directory, file): > + message = '%s/%s not found or has no metalink' % (directory, file) > + return metalink_failuredoc(message) > + > + > +def metalink(cache, directory, file, hosts_and_urls): > + preference = 100 > + try: > + fdc = file_details_cache[directory] > + detailslist = fdc[file] > + except KeyError: > + return ('metalink', 404, metalink_file_not_found(directory, file)) > + > + def indent(n): > + return ' ' * n * 2 > + > + doc = metalink_header() > + doc += indent(1) + '<files>\n' > + doc += indent(2) + '<file name="%s">\n' % (file) > + y = detailslist[0] > + > + def details(y, indentlevel=2): > + doc = '' > + if y['timestamp'] is not None: > + doc += indent(indentlevel+1) \ > + + '<mm0:timestamp>%s</mm0:timestamp>\n' % y['timestamp'] > + if y['size'] is not None: > + doc += indent(indentlevel+1) + '<size>%s</size>\n' % y['size'] > + doc += indent(indentlevel+1) + '<verification>\n' > + hashes = ('md5', 'sha1', 'sha256', 'sha512') > + for h in hashes: > + if y[h] is not None: > + doc += indent(indentlevel+2) \ > + + '<hash type="%s">%s</hash>\n' % (h, y[h]) > + doc += indent(indentlevel+1) + '</verification>\n' > + return doc > + > + doc += details(y, 2) > + # there can be multiple files > + if len(detailslist) > 1: > + doc += indent(3) + '<mm0:alternates>\n' > + for y in detailslist[1:]: > + doc += indent(4) + '<mm0:alternate>\n' > + doc += details(y,5) > + doc += indent(4) + '</mm0:alternate>\n' > + doc += indent(3) + '</mm0:alternates>\n' > + > + doc += indent(3) + '<resources maxconnections="1">\n' > + for (hostid, hcurls) in hosts_and_urls: > + private = '' > + if hostid not in cache['global']: > + private = 'mm0:private="True"' > + for url in hcurls: > + protocol = url.split(':')[0] > + # FIXME January 2010 > + # adding protocol= here is not part of the Metalink 3.0 spec, > + # but MirrorManager 1.2.6 used it accidentally, as did > + # yum 3.2.20-3 as released in Fedora 8, 9, and 10. After those > + # three are EOL (~January 2010), the extra protocol= can be > + # removed. > + doc += indent(4) + \ > + '<url protocol="%s" type="%s" location="%s" '\ > + 'preference="%s" %s>' % ( > + protocol, protocol, host_country_cache[hostid].upper(), > + preference, private) > + doc += url > + doc += '</url>\n' > + preference = max(preference-1, 1) > + doc += indent(3) + '</resources>\n' > + doc += indent(2) + '</file>\n' > + doc += indent(1) + '</files>\n' > + doc += '</metalink>\n' > + return ('metalink', 200, doc) > + > + > +def tree_lookup(tree, ip, field, maxResults=None): > + # fast lookup in the tree; if present, find all the matching values by > + # deleting the found one and searching again this is safe w/o copying > + # the tree again only because this is the only place the tree is used, > + # and we'll get a new copy of the tree from our parent the next time it > + # fork()s. > + # returns a list of tuples (prefix, data) > + result = [] > + len_data = 0 > + if ip is None: > + return result > + node = tree.search_best(ip.strNormal()) > + while node is not None: > + prefix = node.prefix > + if type(node.data[field]) == list: > + len_data += len(node.data[field]) > + else: > + len_data += 1 > + t = (prefix, node.data[field],) > + result.append(t) > + if maxResults is None or len_data < maxResults: > + tree.delete(prefix) > + node = tree.search_best(ip.strNormal()) > + else: > + break > + return result > + > + > +def trim_by_client_country(s, clientCountry): > + if clientCountry is None: > + return s > + r = s.copy() > + for hostid in s: > + if hostid in host_country_allowed_cache and \ > + clientCountry not in host_country_allowed_cache[hostid]: > + r.remove(hostid) > + return r > + > + > +def shuffle(s): > + l = [] > + for hostid in s: > + item = (host_bandwidth_cache[hostid], hostid) > + l.append(item) > + newlist = weighted_shuffle(l) > + results = [] > + for (bandwidth, hostid) in newlist: > + results.append(hostid) > + return results > + > + > +continents = {} > + > +def handle_country_continent_redirect(): > + new_country_continents = GeoIP.country_continents > + for country, continent in country_continent_redirect_cache.iteritems(): > + new_country_continents[country] = continent > + global country_continents > + country_continents = new_country_continents > + > + > +def setup_continents(): > + new_continents = defaultdict(list) > + handle_country_continent_redirect() > + for c, continent in country_continents.iteritems(): > + new_continents[continent].append(c) > + global continents > + continents = new_continents > + > + > +def do_global(kwargs, cache, clientCountry, header): > + c = trim_by_client_country(cache['global'], clientCountry) > + header += 'country = global ' > + return (header, c) > + > + > +def do_countrylist(kwargs, cache, clientCountry, requested_countries, > header): > + > + def collapse(d): > + """ collapses a dict {key:set(hostids)} into a set of hostids """ > + s = set() > + for country, hostids in d.iteritems(): > + for hostid in hostids: > + s.add(hostid) > + return s > + > + country_cache = {} > + for c in requested_countries: > + if c in cache['byCountry']: > + country_cache[c] = cache['byCountry'][c] > + header += 'country = %s ' % c > + s = collapse(country_cache) > + s = trim_by_client_country(s, clientCountry) > + return (header, s) > + > + > +def get_same_continent_countries(clientCountry, requested_countries): > + result = [] > + for r in requested_countries: > + if r in country_continents: > + requestedCountries = [ > + c.upper() for c in continents[country_continents[r]] > + if c != clientCountry] > + result.extend(requestedCountries) > + result = uniqueify(result) > + return result > + > + > +def do_continent(kwargs, cache, clientCountry, requested_countries, header): > + if len(requested_countries) > 0: > + rc = requested_countries > + else: > + rc = [clientCountry] > + clist = get_same_continent_countries(clientCountry, rc) > + return do_countrylist(kwargs, cache, clientCountry, clist, header) > + > + > +def do_country(kwargs, cache, clientCountry, requested_countries, header): > + if 'GLOBAL' in requested_countries: > + return do_global(kwargs, cache, clientCountry, header) > + return do_countrylist( > + kwargs, cache, clientCountry, requested_countries, header) > + > + > +def do_netblocks(kwargs, cache, header): > + hostresults = set() > + if not kwargs.has_key('netblock') or kwargs['netblock'] == "1": > + tree_results = tree_lookup(host_netblocks_tree, kwargs['IP'], > 'hosts') > + for (prefix, hostids) in tree_results: > + for hostid in hostids: > + if hostid in cache['byHostId']: > + hostresults.add((prefix, hostid,)) > + header += 'Using preferred netblock ' > + return (header, hostresults) > + > + > +def do_internet2(kwargs, cache, clientCountry, header): > + hostresults = set() > + ip = kwargs['IP'] > + if ip is None: > + return (header, hostresults) > + asn = lookup_ip_asn(internet2_tree, ip) > + if asn is not None: > + header += 'Using Internet2 ' > + if clientCountry is not None \ > + and clientCountry in cache['byCountryInternet2']: > + hostresults = cache['byCountryInternet2'][clientCountry] > + hostresults = trim_by_client_country(hostresults, clientCountry) > + return (header, hostresults) > + > + > +def do_asn(kwargs, cache, header): > + hostresults = set() > + ip = kwargs['IP'] > + if ip is None: > + return (header, hostresults) > + asn = lookup_ip_asn(global_tree, ip) > + if asn is not None and asn in asn_host_cache: > + for hostid in asn_host_cache[asn]: > + if hostid in cache['byHostId']: > + hostresults.add(hostid) > + header += 'Using ASN %s ' % asn > + return (header, hostresults) > + > + > +def do_geoip(kwargs, cache, clientCountry, header): > + hostresults = set() > + if clientCountry is not None and clientCountry in cache['byCountry']: > + hostresults = cache['byCountry'][clientCountry] > + header += 'country = %s ' % clientCountry > + hostresults = trim_by_client_country(hostresults, clientCountry) > + return (header, hostresults) > + > + > +def do_location(kwargs, header): > + hostresults = set() > + if 'location' in kwargs and kwargs['location'] in location_cache: > + hostresults = set(location_cache[kwargs['location']]) > + header += "Using location %s " % kwargs['location'] > + return (header, hostresults) > + > + > +def append_path(hosts, cache, file, pathIsDirectory=False): > + """ given a list of hosts, return a list of objects: > + [(hostid, [hcurls]), ... ] > + in the same order, appending file if it's not None""" > + subpath = None > + results = [] > + if 'subpath' in cache: > + subpath = cache['subpath'] > + for hostid in hosts: > + hcurls = [] > + for hcurl_id in cache['byHostId'][hostid]: > + s = hcurl_cache[hcurl_id] > + if subpath is not None: > + s += "/" + subpath > + if file is None and pathIsDirectory: > + s += "/" > + if file is not None: > + if not s.endswith('/'): > + s += "/" > + s += file > + hcurls.append(s) > + results.append((hostid, hcurls)) > + return results > + > + > +def trim_to_preferred_protocols(hosts_and_urls): > + """ remove all but http and ftp URLs, > + and if both http and ftp are offered, > + leave only http. Return [(hostid, url), ...] """ > + results = [] > + try_protocols = ('https', 'http', 'ftp') > + for (hostid, hcurls) in hosts_and_urls: > + protocols = {} > + url = None > + for hcurl in hcurls: > + for p in try_protocols: > + if hcurl.startswith(p+':'): > + protocols[p] = hcurl > + > + for p in try_protocols: > + if p in protocols: > + url = protocols[p] > + break > + > + if url is not None: > + results.append((hostid, url)) > + return results > + > + > +def client_ip_to_country(ip): > + clientCountry = None > + if ip is None: > + return None > + > + # lookup in the cache first > + tree_results = tree_lookup( > + netblock_country_tree, ip, 'country', maxResults=1) > + if len(tree_results) > 0: > + (prefix, clientCountry) = tree_results[0] > + return clientCountry > + > + # attempt IPv6, then IPv6 6to4 as IPv4, then Teredo, then IPv4 > + try: > + if ip.version() == 6: > + if gipv6 is not None: > + clientCountry = gipv6.country_code_by_addr_v6( > + ip.strNormal()) > + if clientCountry is None: > + # Try the IPv6-to-IPv4 translation schemes > + for scheme in (convert_6to4_v4, convert_teredo_v4): > + result = scheme(ip) > + if result is not None: > + ip = result > + break > + if ip.version() == 4 and gipv4 is not None: > + clientCountry = gipv4.country_code_by_addr(ip.strNormal()) > + except: > + pass > + return clientCountry > + > + > +def do_mirrorlist(kwargs): > + global debug > + global logfile > + > + def return_error(kwargs, message='', returncode=200): > + d = dict( > + returncode=returncode, > + message=message, > + resulttype='mirrorlist', > + results=[]) > + if 'metalink' in kwargs and kwargs['metalink']: > + d['resulttype'] = 'metalink' > + d['results'] = metalink_failuredoc(message) > + return d > + > + if not (kwargs.has_key('repo') \ > + and kwargs.has_key('arch')) \ > + and not kwargs.has_key('path'): > + return return_error( > + kwargs, > + message='# either path=, or repo= and arch= must be specified') > + > + file = None > + cache = None > + pathIsDirectory = False > + if kwargs.has_key('path'): > + path = kwargs['path'].strip('/') > + > + # Strip duplicate "//" from the path > + path = path.replace('//', '/') > + > + header = "# path = %s " % (path) > + > + sdir = path.split('/') > + try: > + # path was to a directory > + cache = mirrorlist_cache['/'.join(sdir)] > + pathIsDirectory=True > + except KeyError: > + # path was to a file, try its directory > + file = sdir[-1] > + sdir = sdir[:-1] > + try: > + cache = mirrorlist_cache['/'.join(sdir)] > + except KeyError: > + return return_error( > + kwargs, message=header + 'error: invalid path') > + dir = '/'.join(sdir) > + else: > + if u'source' in kwargs['repo']: > + kwargs['arch'] = u'source' > + repo = repo_redirect.get(kwargs['repo'], kwargs['repo']) > + arch = kwargs['arch'] > + header = "# repo = %s arch = %s " % (repo, arch) > + > + if repo in disabled_repositories: > + return return_error(kwargs, message=header + 'repo disabled') > + try: > + dir = repo_arch_to_directoryname[(repo, arch)] > + if 'metalink' in kwargs and kwargs['metalink']: > + dir += '/repodata' > + file = 'repomd.xml' > + else: > + pathIsDirectory=True > + cache = mirrorlist_cache[dir] > + except KeyError: > + repos = repo_arch_to_directoryname.keys() > + repos.sort() > + repo_information = header + "error: invalid repo or arch\n" > + repo_information += "# following repositories are available:\n" > + for i in repos: > + if i[0] is not None and i[1] is not None: > + repo_information += "# repo=%s&arch=%s\n" % i > + return return_error(kwargs, message=repo_information) > + > + # set kwargs['IP'] exactly once > + try: > + kwargs['IP'] = IP(kwargs['client_ip']) > + except: > + kwargs['IP'] = None > + > + ordered_mirrorlist = cache.get( > + 'ordered_mirrorlist', default_ordered_mirrorlist) > + done = 0 > + location_results = set() > + netblock_results = set() > + asn_results = set() > + internet2_results = set() > + country_results = set() > + geoip_results = set() > + continent_results = set() > + global_results = set() > + > + header, location_results = do_location(kwargs, header) > + > + requested_countries = [] > + if kwargs.has_key('country'): > + requested_countries = uniqueify( > + [c.upper() for c in kwargs['country'].split(',') ]) > + > + # if they specify a country, don't use netblocks or ASN > + if not 'country' in kwargs: > + header, netblock_results = do_netblocks(kwargs, cache, header) > + if len(netblock_results) > 0: > + if not ordered_mirrorlist: > + done=1 > + > + if not done: > + header, asn_results = do_asn(kwargs, cache, header) > + if len(asn_results) + len(netblock_results) >= 3: > + if not ordered_mirrorlist: > + done = 1 > + > + clientCountry = client_ip_to_country(kwargs['IP']) > + > + if clientCountry is None: > + print_client_country = "N/A" > + else: > + print_client_country = clientCountry > + > + if debug and kwargs.has_key('repo') and kwargs.has_key('arch'): > + msg = "IP: %s; DATE: %s; COUNTRY: %s; REPO: %s; ARCH: %s\n" % ( > + (kwargs['IP'] or 'None'), time.strftime("%Y-%m-%d"), > + print_client_country, kwargs['repo'], kwargs['arch']) > + > + sys.stdout.write(msg) > + sys.stdout.flush() > + > + if logfile is not None: > + logfile.write(msg) > + logfile.flush() > + > + if not done: > + header, internet2_results = do_internet2( > + kwargs, cache, clientCountry, header) > + if len(internet2_results) + len(netblock_results) + len(asn_results) > >= 3: > + if not ordered_mirrorlist: > + done = 1 > + > + if not done and 'country' in kwargs: > + header, country_results = do_country( > + kwargs, cache, clientCountry, requested_countries, header) > + if len(country_results) == 0: > + header, continent_results = do_continent( > + kwargs, cache, clientCountry, requested_countries, header) > + done = 1 > + > + if not done: > + header, geoip_results = do_geoip( > + kwargs, cache, clientCountry, header) > + if len(geoip_results) >= 3: > + if not ordered_mirrorlist: > + done = 1 > + > + if not done: > + header, continent_results = do_continent( > + kwargs, cache, clientCountry, [], header) > + if len(geoip_results) + len(continent_results) >= 3: > + done = 1 > + > + if not done: > + header, global_results = do_global( > + kwargs, cache, clientCountry, header) > + > + def _random_shuffle(s): > + l = list(s) > + random.shuffle(l) > + return l > + > + def _ordered_netblocks(s): > + def ipy_len(t): > + (prefix, hostid) = t > + return IP(prefix).len() > + v4_netblocks = [] > + v6_netblocks = [] > + for (prefix, hostid) in s: > + ip = IP(prefix) > + if ip.version() == 4: > + v4_netblocks.append((prefix, hostid)) > + elif ip.version() == 6: > + v6_netblocks.append((prefix, hostid)) > + # mix up the order, as sort will preserve same-key ordering > + random.shuffle(v4_netblocks) > + v4_netblocks.sort(key=ipy_len) > + random.shuffle(v6_netblocks) > + v6_netblocks.sort(key=ipy_len) > + v4_netblocks = [t[1] for t in v4_netblocks] > + v6_netblocks = [t[1] for t in v6_netblocks] > + return v6_netblocks + v4_netblocks > + > + def whereismymirror(result_sets): > + return_string = 'None' > + allhosts = [] > + found = False > + for (l,s,f) in result_sets: > + if len(l) > 0: > + allhosts.extend(f(l)) > + if not found: > + return_string = s > + found = True > + > + allhosts = uniqueify(allhosts) > + return allhosts, return_string > + > + result_sets = [ > + (location_results, "location", _random_shuffle), > + (netblock_results, "netblocks", _ordered_netblocks), > + (asn_results, "asn", _random_shuffle), > + (internet2_results, "I2", _random_shuffle), > + (country_results, "country", shuffle), > + (geoip_results, "geoip", shuffle), > + (continent_results, "continent", shuffle), > + (global_results, "global", shuffle), > + ] > + > + allhosts, where_string = whereismymirror(result_sets) > + try: > + ip_str = kwargs['IP'].strNormal() > + except: > + ip_str = 'Unknown IP' > + log_string = "mirrorlist: %s found its best mirror from %s" % ( > + ip_str, where_string) > + syslogger.info(log_string) > + > + hosts_and_urls = append_path( > + allhosts, cache, file, pathIsDirectory=pathIsDirectory) > + > + if 'metalink' in kwargs and kwargs['metalink']: > + (resulttype, returncode, results)=metalink( > + cache, dir, file, hosts_and_urls) > + d = dict( > + message=None, > + resulttype=resulttype, > + returncode=returncode, > + results=results) > + return d > + > + else: > + host_url_list = trim_to_preferred_protocols(hosts_and_urls) > + d = dict( > + message=header, > + resulttype='mirrorlist', > + returncode=200, > + results=host_url_list) > + return d > + > + > +def setup_cache_tree(cache, field): > + tree = radix.Radix() > + for k, v in cache.iteritems(): > + node = tree.add(k.strNormal()) > + node.data[field] = v > + return tree > + > + > +def setup_netblocks(netblocks_file, asns_wanted=None): > + tree = radix.Radix() > + if netblocks_file is not None: > + try: > + f = open(netblocks_file, 'r') > + except: > + return tree > + for l in f: > + try: > + s = l.split() > + start, mask = s[0].split('/') > + mask = int(mask) > + if mask == 0: continue > + asn = int(s[1]) > + if asns_wanted is None or asn in asns_wanted: > + node = tree.add(s[0]) > + node.data['asn'] = asn > + except: > + pass > + f.close() > + return tree > + > + > +def read_caches(): > + global mirrorlist_cache > + global host_netblock_cache > + global host_country_allowed_cache > + global host_max_connections_cache > + global repo_arch_to_directoryname > + global repo_redirect > + global country_continent_redirect_cache > + global disabled_repositories > + global host_bandwidth_cache > + global host_country_cache > + global file_details_cache > + global hcurl_cache > + global asn_host_cache > + global location_cache > + global netblock_country_cache > + > + data = {} > + try: > + f = open(cachefile, 'r') > + data = pickle.load(f) > + f.close() > + except: > + pass > + > + if 'mirrorlist_cache' in data: > + mirrorlist_cache = data['mirrorlist_cache'] > + if 'host_netblock_cache' in data: > + host_netblock_cache = data['host_netblock_cache'] > + if 'host_country_allowed_cache' in data: > + host_country_allowed_cache = data['host_country_allowed_cache'] > + if 'repo_arch_to_directoryname' in data: > + repo_arch_to_directoryname = data['repo_arch_to_directoryname'] > + if 'repo_redirect_cache' in data: > + repo_redirect = data['repo_redirect_cache'] > + if 'country_continent_redirect_cache' in data: > + country_continent_redirect_cache = data[ > + 'country_continent_redirect_cache'] > + if 'disabled_repositories' in data: > + disabled_repositories = data['disabled_repositories'] > + if 'host_bandwidth_cache' in data: > + host_bandwidth_cache = data['host_bandwidth_cache'] > + if 'host_country_cache' in data: > + host_country_cache = data['host_country_cache'] > + if 'file_details_cache' in data: > + file_details_cache = data['file_details_cache'] > + if 'hcurl_cache' in data: > + hcurl_cache = data['hcurl_cache'] > + if 'asn_host_cache' in data: > + asn_host_cache = data['asn_host_cache'] > + if 'location_cache' in data: > + location_cache = data['location_cache'] > + if 'netblock_country_cache' in data: > + netblock_country_cache = data['netblock_country_cache'] > + if 'host_max_connections_cache' in data: > + host_max_connections_cache = data['host_max_connections_cache'] > + > + setup_continents() > + global internet2_tree > + global global_tree > + global host_netblocks_tree > + global netblock_country_tree > + > + internet2_tree = setup_netblocks(internet2_netblocks_file) > + global_tree = setup_netblocks(global_netblocks_file, asn_host_cache) > + # host_netblocks_tree key is a netblock, value is a list of host IDs > + host_netblocks_tree = setup_cache_tree(host_netblock_cache, 'hosts') > + # netblock_country_tree key is a netblock, value is a single country > string > + netblock_country_tree = setup_cache_tree( > + netblock_country_cache, 'country') > + > + > +def errordoc(metalink, message): > + if metalink: > + doc = metalink_failuredoc(message) > + else: > + doc = message > + return doc > + > + > +class MirrorlistHandler(StreamRequestHandler): > + def handle(self): > + signal.signal(signal.SIGHUP, signal.SIG_IGN) > + random.seed() > + try: > + # read size of incoming pickle > + readlen = 0 > + size = '' > + while readlen < 10: > + size += self.rfile.read(10 - readlen) > + readlen = len(size) > + size = atoi(size) > + > + # read the pickle > + readlen = 0 > + p = '' > + while readlen < size: > + p += self.rfile.read(size - readlen) > + readlen = len(p) > + d = pickle.loads(p) > + self.connection.shutdown(socket.SHUT_RD) > + except: > + pass > + > + try: > + try: > + r = do_mirrorlist(d) > + except: > + raise > + message = r['message'] > + results = r['results'] > + resulttype = r['resulttype'] > + returncode = r['returncode'] > + except Exception, e: > + message=u'# Bad Request %s\n# %s' % (e, d) > + exception_msg = traceback.format_exc(e) > + sys.stderr.write(message+'\n') > + sys.stderr.write(exception_msg) > + sys.stderr.flush() > + returncode = 400 > + results = [] > + resulttype = 'mirrorlist' > + if d['metalink']: > + resulttype = 'metalink' > + results = errordoc(d['metalink'], message) > + > + try: > + p = pickle.dumps({ > + 'message':message, > + 'resulttype':resulttype, > + 'results':results, > + 'returncode':returncode}) > + self.connection.sendall(zfill('%s' % len(p), 10)) > + > + self.connection.sendall(p) > + self.connection.shutdown(socket.SHUT_WR) > + except: > + pass > + > + > +def sighup_handler(signum, frame): > + global logfile > + if logfile is not None: > + name = logfile.name > + logfile.close() > + logfile = open(name, 'a') > + > + # put this in a separate thread so it doesn't block clients > + if threading.active_count() < 2: > + thread = threading.Thread(target=load_databases_and_caches) > + thread.daemon = False > + try: > + thread.start() > + except KeyError: > + # bug fix for handing an exception when unable to delete from > + #_limbo even though it's not in limbo > + # > https://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/dist27/threading.py?r=327 > + pass > + > + > +def sigterm_handler(signum, frame): > + global must_die > + signal.signal(signal.SIGHUP, signal.SIG_IGN) > + signal.signal(signal.SIGTERM, signal.SIG_IGN) > + if signum == signal.SIGTERM: > + must_die = True > + > + > +class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): > + request_queue_size = 300 > + def finish_request(self, request, client_address): > + signal.signal(signal.SIGHUP, signal.SIG_IGN) > + BaseServer.finish_request(self, request, client_address) > + > + > +def parse_args(): > + global cachefile > + global socketfile > + global internet2_netblocks_file > + global global_netblocks_file > + global debug > + global logfile > + global pidfile > + opts, args = getopt.getopt( > + sys.argv[1:], "c:i:g:p:s:dl:", > + [ > + "cache", "internet2_netblocks", "global_netblocks", > + "pidfile", "socket", "debug", "log=" > + ] > + ) > + for option, argument in opts: > + if option in ("-c", "--cache"): > + cachefile = argument > + if option in ("-i", "--internet2_netblocks"): > + internet2_netblocks_file = argument > + if option in ("-g", "--global_netblocks"): > + global_netblocks_file = argument > + if option in ("-s", "--socket"): > + socketfile = argument > + if option in ("-p", "--pidfile"): > + pidfile = argument > + if option in ("-l", "--log"): > + try: > + logfile = open(argument, 'a') > + except: > + logfile = None > + if option in ("-d", "--debug"): > + debug = True > + > + > +def open_geoip_databases(): > + global gipv4 > + global gipv6 > + try: > + gipv4 = GeoIP.open( > + "/usr/share/GeoIP/GeoIP.dat", GeoIP.GEOIP_STANDARD) > + except: > + gipv4=None > + try: > + gipv6 = GeoIP.open( > + "/usr/share/GeoIP/GeoIPv6.dat", GeoIP.GEOIP_STANDARD) > + except: > + gipv6=None > + > + > +def convert_6to4_v4(ip): > + all_6to4 = IP('2002::/16') > + if ip.version() != 6 or ip not in all_6to4: > + return None > + parts=ip.strNormal().split(':') > + > + ab = int(parts[1],16) > + a = (ab >> 8) & 0xFF > + b = ab & 0xFF > + cd = int(parts[2],16) > + c = (cd >> 8) & 0xFF > + d = cd & 0xFF > + > + v4addr = '%d.%d.%d.%d' % (a,b,c,d) > + return IP(v4addr) > + > + > +def convert_teredo_v4(ip): > + teredo_std = IP('2001::/32') > + teredo_xp = IP('3FFE:831F::/32') > + if ip.version() != 6 or (ip not in teredo_std and ip not in teredo_xp): > + return None > + parts=ip.strNormal().split(':') > + > + ab = int(parts[6],16) > + a = ((ab >> 8) & 0xFF) ^ 0xFF > + b = (ab & 0xFF) ^ 0xFF > + cd = int(parts[7],16) > + c = ((cd >> 8) & 0xFF) ^ 0xFF > + d = (cd & 0xFF) ^ 0xFF > + > + v4addr = '%d.%d.%d.%d' % (a,b,c,d) > + return IP(v4addr) > + > + > +def load_databases_and_caches(*args, **kwargs): > + sys.stderr.write("load_databases_and_caches...") > + sys.stderr.flush() > + open_geoip_databases() > + read_caches() > + sys.stderr.write("done.\n") > + sys.stderr.flush() > + > + > +def remove_pidfile(pidfile): > + os.unlink(pidfile) > + > + > +def create_pidfile_dir(pidfile): > + piddir = os.path.dirname(pidfile) > + if not piddir: > + return > + try: > + os.makedirs(piddir, mode=0755) > + except OSError, err: > + if err.errno == 17: # File exists > + pass > + else: > + raise > + except: > + raise > + > + > +def write_pidfile(pidfile, pid): > + create_pidfile_dir(pidfile) > + f = open(pidfile, 'w') > + f.write(str(pid)+'\n') > + f.close() > + return 0 > + > + > +def manage_pidfile(pidfile): > + """returns 1 if another process is running that is named in pidfile, > + otherwise creates/writes pidfile and returns 0.""" > + pid = os.getpid() > + try: > + f = open(pidfile, 'r') > + except IOError, err: > + if err.errno == 2: # No such file or directory > + return write_pidfile(pidfile, pid) > + return 1 > + > + oldpid=f.read() > + f.close() > + > + # is the oldpid process still running? > + try: > + os.kill(int(oldpid), 0) > + except ValueError: # malformed oldpid > + return write_pidfile(pidfile, pid) > + except OSError, err: > + if err.errno == 3: # No such process > + return write_pidfile(pidfile, pid) > + return 1 > + > + > +def main(): > + global logfile > + global pidfile > + signal.signal(signal.SIGHUP, signal.SIG_IGN) > + parse_args() > + manage_pidfile(pidfile) > + > + oldumask = os.umask(0) > + try: > + os.unlink(socketfile) > + except: > + pass > + > + load_databases_and_caches() > + signal.signal(signal.SIGHUP, sighup_handler) > + # restart interrupted syscalls like select > + signal.siginterrupt(signal.SIGHUP, False) > + ss = ForkingUnixStreamServer(socketfile, MirrorlistHandler) > + > + while not must_die: > + try: > + ss.serve_forever() > + except select.error: > + pass > + > + try: > + os.unlink(socketfile) > + except: > + pass > + > + if logfile is not None: > + try: > + logfile.close() > + except: > + pass > + > + remove_pidfile(pidfile) > + return 0 > + > + > +if __name__ == "__main__": > + try: > + sys.exit(main()) > + except KeyboardInterrupt: > + sys.exit(-1) > > _______________________________________________ > infrastructure mailing list > infrastructure@xxxxxxxxxxxxxxxxxxxxxxx > http://lists.fedoraproject.org/admin/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx > _______________________________________________ infrastructure mailing list infrastructure@xxxxxxxxxxxxxxxxxxxxxxx http://lists.fedoraproject.org/admin/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx