Re: Freeze Break Request: Increase number of children for mirrorlist-server

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Sounds good to me.
If it breaks anything, we can always just reinstall the mirrormanager2-mirrorlist package to return to the current version.

With kind regards,
Patrick Uiterwijk
Fedora Infra

----- Original Message -----
> As there are more reports of timeouts connected to the mirrorlist
> servers I had a closer look at the code. It seems we are hitting the
> default value of max_children = 40. This freeze break request tries to
> increase this to 80 via a hotfix. See commit message below for further
> details.
> 
> 		Adrian
> 
> 
> commit 2b8e284e687d097c2cad6cd24dc3db3aa23f837d
> Author: Adrian Reber <adrian@xxxxxxxx>
> Date:   Thu Oct 15 14:03:02 2015 +0000
> 
>     Increase the number of possible child processes
>     
>     The mirrorlist-server is the process which has the mirrorlist data
>     loaded and which is accessed by the public facing
>     mirrorlist_client.wsgi. The mirrorlist-server uses the
>     ForkingUnixStreamServer which has a default of max_children = 40.
>     (https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py#l516)
>     
>     Looking at the code of ForkingUnixStreamServer it says at
>     
>     https://hg.python.org/cpython/file/2.7/Lib/SocketServer.py#l523
>     
>       # If we're above the max number of children, wait and reap them until
>       # we go back below threshold. Note that we use waitpid(-1) below to be
>       # able to collect children in size(<defunct children>) syscalls instead
>       # of size(<children>): the downside is that this might reap children
>       # which we didn't spawn, which is why we only resort to this when we're
>       # above max_children.
>     
>     As we are running the wsgi with processes=45 this sounds like it can
>     lead to situation where it might just hang.
>     
>     This increases max_children to 80 and maybe processes could be upped to
>     60 or 70.
>     
>     Signed-off-by: Adrian Reber <adrian@xxxxxxxx>
> 
> diff --git a/files/hotfix/mirrorlist/mirrorlist_server.py
> b/files/hotfix/mirrorlist/mirrorlist_server.py
> index 2d98fa0..2b81912 100644
> --- a/files/hotfix/mirrorlist/mirrorlist_server.py
> +++ b/files/hotfix/mirrorlist/mirrorlist_server.py
> @@ -938,6 +938,7 @@ def sigterm_handler(signum, frame):
>  
>  class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer):
>      request_queue_size = 300
> +    max_children = 80
>      def finish_request(self, request, client_address):
>          signal.signal(signal.SIGHUP, signal.SIG_IGN)
>          BaseServer.finish_request(self, request, client_address)
> diff --git a/roles/mirrormanager/mirrorlist2/tasks/main.yml
> b/roles/mirrormanager/mirrorlist2/tasks/main.yml
> index 786432f..2e5f5ee 100644
> --- a/roles/mirrormanager/mirrorlist2/tasks/main.yml
> +++ b/roles/mirrormanager/mirrorlist2/tasks/main.yml
> @@ -73,3 +73,12 @@
>  #  tags:
>  #  - mirrorlist2
>  #  - selinux
> +
> +
> +- name: HOTFIX - increase the number of possible child processes
> +  copy: src="{{ files }}/hotfix/mirrorlist/mirrorlist_server.py"
> dest=/usr/share/mirrormanager2/mirrorlist_server.py
> +        owner=root group=root mode=0755
> +  tags:
> +  - hotfix
> +  notify:
> +  - restart mirrorlist-server
> 
> commit d04588274a6e161a36d8dbd9c91ea0fe78de017e
> Author: Adrian Reber <adrian@xxxxxxxx>
> Date:   Thu Oct 15 13:54:53 2015 +0000
> 
>     Original mirrorlist_server.py which needs to be hotfixed
>     
>     Signed-off-by: Adrian Reber <adrian@xxxxxxxx>
> 
> diff --git a/files/hotfix/mirrorlist/mirrorlist_server.py
> b/files/hotfix/mirrorlist/mirrorlist_server.py
> new file mode 100644
> index 0000000..2d98fa0
> --- /dev/null
> +++ b/files/hotfix/mirrorlist/mirrorlist_server.py
> @@ -0,0 +1,1136 @@
> +#!/usr/bin/python
> +#
> +# Copyright (c) 2007-2013 Dell, Inc.
> +#  by Matt Domsch <Matt_Domsch@xxxxxxxx>
> +# Licensed under the MIT/X11 license
> +
> +# standard library modules in alphabetical order
> +from collections import defaultdict
> +import datetime
> +import getopt
> +import logging
> +import logging.handlers
> +import os
> +import random
> +import cPickle as pickle
> +import select
> +import signal
> +import socket
> +from SocketServer import (StreamRequestHandler, ForkingMixIn,
> +                          UnixStreamServer, BaseServer)
> +import sys
> +from string import zfill, atoi
> +import time
> +import traceback
> +
> +try:
> +    import threading
> +except ImportError:
> +    import dummy_threading as threading
> +
> +# not-so-standard library modules that this program needs
> +from IPy import IP
> +import GeoIP
> +import radix
> +from weighted_shuffle import weighted_shuffle
> +
> +# can be overridden on the command line
> +pidfile = '/var/run/mirrormanager/mirrorlist_server.pid'
> +socketfile = '/var/run/mirrormanager/mirrorlist_server.sock'
> +cachefile = '/var/lib/mirrormanager/mirrorlist_cache.pkl'
> +internet2_netblocks_file = '/var/lib/mirrormanager/i2_netblocks.txt'
> +global_netblocks_file = '/var/lib/mirrormanager/global_netblocks.txt'
> +logfile = None
> +debug = False
> +must_die = False
> +# at a point in time when we're no longer serving content for versions
> +# that don't use yum prioritymethod=fallback
> +# (e.g. after Fedora 7 is past end-of-life)
> +# then we can set this value to True
> +# this only affects results requested using path=...
> +# for dirs which aren't repositories (such as iso/)
> +# because we don't know the Version associated with that dir here.
> +default_ordered_mirrorlist = False
> +
> +gipv4 = None
> +gipv6 = None
> +
> +# key is strings in tuple (repo.prefix, arch)
> +mirrorlist_cache = {}
> +
> +# key is directory.name, returns keys for mirrorlist_cache
> +directory_name_to_mirrorlist = {}
> +
> +# key is an IPy.IP structure, value is list of host ids
> +host_netblock_cache = {}
> +
> +# key is hostid, value is list of countries to allow
> +host_country_allowed_cache = {}
> +
> +repo_arch_to_directoryname = {}
> +
> +# redirect from a repo with one name to a repo with another
> +repo_redirect = {}
> +country_continent_redirect_cache = {}
> +
> +# our own private copy of country_continents to be edited
> +country_continents = GeoIP.country_continents
> +
> +disabled_repositories = {}
> +host_bandwidth_cache = {}
> +host_country_cache = {}
> +host_max_connections_cache = {}
> +file_details_cache = {}
> +hcurl_cache = {}
> +asn_host_cache = {}
> +internet2_tree = radix.Radix()
> +global_tree = radix.Radix()
> +host_netblocks_tree = radix.Radix()
> +netblock_country_tree = radix.Radix()
> +location_cache = {}
> +netblock_country_cache = {}
> +
> +## Set up our syslog data.
> +syslogger = logging.getLogger('mirrormanager')
> +syslogger.setLevel(logging.INFO)
> +handler = logging.handlers.SysLogHandler(
> +    address='/dev/log',
> +    facility=logging.handlers.SysLogHandler.LOG_LOCAL4)
> +syslogger.addHandler(handler)
> +
> +
> +def lookup_ip_asn(tree, ip):
> +    """ @t is a radix tree
> +        @ip is an IPy.IP object which may be contained in an entry in l
> +        """
> +    node = tree.search_best(ip.strNormal())
> +    if node is None:
> +        return None
> +    return node.data['asn']
> +
> +
> +def uniqueify(seq, idfun=None):
> +    # order preserving
> +    if idfun is None:
> +        def idfun(x): return x
> +    seen = {}
> +    result = []
> +    for item in seq:
> +        marker = idfun(item)
> +        # in old Python versions:
> +        # if seen.has_key(marker)
> +        # but in new ones:
> +        if marker in seen: continue
> +        seen[marker] = 1
> +        result.append(item)
> +    return result
> +
> +
> +##### Metalink Support #####
> +
> +def metalink_header():
> +    # fixme add alternate format pubdate when specified
> +    pubdate = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S
> GMT")
> +    doc = ''
> +    doc += '<?xml version="1.0" encoding="utf-8"?>\n'
> +    doc += '<metalink version="3.0" xmlns="http://www.metalinker.org/";'
> +    doc += ' type="dynamic"'
> +    doc += ' pubdate="%s"' % pubdate
> +    doc += ' generator="mirrormanager"'
> +    doc += ' xmlns:mm0="http://fedorahosted.org/mirrormanager";'
> +    doc += '>\n'
> +    return doc
> +
> +
> +def metalink_failuredoc(message=None):
> +    doc = metalink_header()
> +    if message is not None:
> +        doc += '<!--\n'
> +        doc += message + '\n'
> +        doc += '-->\n'
> +    doc += '</metalink>\n'
> +    return doc
> +
> +
> +def metalink_file_not_found(directory, file):
> +    message = '%s/%s not found or has no metalink' % (directory, file)
> +    return metalink_failuredoc(message)
> +
> +
> +def metalink(cache, directory, file, hosts_and_urls):
> +    preference = 100
> +    try:
> +        fdc = file_details_cache[directory]
> +        detailslist = fdc[file]
> +    except KeyError:
> +        return ('metalink', 404, metalink_file_not_found(directory, file))
> +
> +    def indent(n):
> +        return ' ' * n * 2
> +
> +    doc = metalink_header()
> +    doc += indent(1) + '<files>\n'
> +    doc += indent(2) + '<file name="%s">\n' % (file)
> +    y = detailslist[0]
> +
> +    def details(y, indentlevel=2):
> +        doc = ''
> +        if y['timestamp'] is not None:
> +            doc += indent(indentlevel+1) \
> +                + '<mm0:timestamp>%s</mm0:timestamp>\n' % y['timestamp']
> +        if y['size'] is not None:
> +            doc += indent(indentlevel+1) + '<size>%s</size>\n' % y['size']
> +        doc += indent(indentlevel+1) + '<verification>\n'
> +        hashes = ('md5', 'sha1', 'sha256', 'sha512')
> +        for h in hashes:
> +            if y[h] is not None:
> +                doc += indent(indentlevel+2) \
> +                    + '<hash type="%s">%s</hash>\n' % (h, y[h])
> +        doc += indent(indentlevel+1) + '</verification>\n'
> +        return doc
> +
> +    doc += details(y, 2)
> +    # there can be multiple files
> +    if len(detailslist) > 1:
> +        doc += indent(3) + '<mm0:alternates>\n'
> +        for y in detailslist[1:]:
> +            doc += indent(4) + '<mm0:alternate>\n'
> +            doc += details(y,5)
> +            doc += indent(4) + '</mm0:alternate>\n'
> +        doc += indent(3) + '</mm0:alternates>\n'
> +
> +    doc += indent(3) + '<resources maxconnections="1">\n'
> +    for (hostid, hcurls) in hosts_and_urls:
> +        private = ''
> +        if hostid not in cache['global']:
> +            private = 'mm0:private="True"'
> +        for url in hcurls:
> +            protocol = url.split(':')[0]
> +            # FIXME January 2010
> +            # adding protocol= here is not part of the Metalink 3.0 spec,
> +            # but MirrorManager 1.2.6 used it accidentally, as did
> +            # yum 3.2.20-3 as released in Fedora 8, 9, and 10.  After those
> +            # three are EOL (~January 2010), the extra protocol= can be
> +            # removed.
> +            doc += indent(4) + \
> +                '<url protocol="%s" type="%s" location="%s" '\
> +                'preference="%s" %s>' % (
> +                    protocol, protocol, host_country_cache[hostid].upper(),
> +                    preference, private)
> +            doc += url
> +            doc += '</url>\n'
> +        preference = max(preference-1, 1)
> +    doc += indent(3) + '</resources>\n'
> +    doc += indent(2) + '</file>\n'
> +    doc += indent(1) + '</files>\n'
> +    doc += '</metalink>\n'
> +    return ('metalink', 200, doc)
> +
> +
> +def tree_lookup(tree, ip, field, maxResults=None):
> +    # fast lookup in the tree; if present, find all the matching values by
> +    # deleting the found one and searching again this is safe w/o copying
> +    # the tree again only because this is the only place the tree is used,
> +    # and we'll get a new copy of the tree from our parent the next time it
> +    # fork()s.
> +    # returns a list of tuples (prefix, data)
> +    result = []
> +    len_data = 0
> +    if ip is None:
> +        return result
> +    node = tree.search_best(ip.strNormal())
> +    while node is not None:
> +        prefix = node.prefix
> +        if type(node.data[field]) == list:
> +            len_data += len(node.data[field])
> +        else:
> +            len_data += 1
> +        t = (prefix, node.data[field],)
> +        result.append(t)
> +        if maxResults is None or len_data < maxResults:
> +            tree.delete(prefix)
> +            node = tree.search_best(ip.strNormal())
> +        else:
> +            break
> +    return result
> +
> +
> +def trim_by_client_country(s, clientCountry):
> +    if clientCountry is None:
> +        return s
> +    r = s.copy()
> +    for hostid in s:
> +        if hostid in host_country_allowed_cache and \
> +               clientCountry not in host_country_allowed_cache[hostid]:
> +            r.remove(hostid)
> +    return r
> +
> +
> +def shuffle(s):
> +    l = []
> +    for hostid in s:
> +        item = (host_bandwidth_cache[hostid], hostid)
> +        l.append(item)
> +    newlist = weighted_shuffle(l)
> +    results = []
> +    for (bandwidth, hostid) in newlist:
> +        results.append(hostid)
> +    return results
> +
> +
> +continents = {}
> +
> +def handle_country_continent_redirect():
> +    new_country_continents = GeoIP.country_continents
> +    for country, continent in country_continent_redirect_cache.iteritems():
> +        new_country_continents[country] = continent
> +    global country_continents
> +    country_continents = new_country_continents
> +
> +
> +def setup_continents():
> +    new_continents = defaultdict(list)
> +    handle_country_continent_redirect()
> +    for c, continent in country_continents.iteritems():
> +        new_continents[continent].append(c)
> +    global continents
> +    continents = new_continents
> +
> +
> +def do_global(kwargs, cache, clientCountry, header):
> +    c = trim_by_client_country(cache['global'], clientCountry)
> +    header += 'country = global '
> +    return (header, c)
> +
> +
> +def do_countrylist(kwargs, cache, clientCountry, requested_countries,
> header):
> +
> +    def collapse(d):
> +        """ collapses a dict {key:set(hostids)} into a set of hostids """
> +        s = set()
> +        for country, hostids in d.iteritems():
> +            for hostid in hostids:
> +                s.add(hostid)
> +        return s
> +
> +    country_cache = {}
> +    for c in requested_countries:
> +        if c in cache['byCountry']:
> +            country_cache[c] = cache['byCountry'][c]
> +            header += 'country = %s ' % c
> +    s = collapse(country_cache)
> +    s = trim_by_client_country(s, clientCountry)
> +    return (header, s)
> +
> +
> +def get_same_continent_countries(clientCountry, requested_countries):
> +    result = []
> +    for r in requested_countries:
> +        if r in country_continents:
> +            requestedCountries = [
> +                c.upper() for c in continents[country_continents[r]]
> +                if c != clientCountry]
> +            result.extend(requestedCountries)
> +    result = uniqueify(result)
> +    return result
> +
> +
> +def do_continent(kwargs, cache, clientCountry, requested_countries, header):
> +    if len(requested_countries) > 0:
> +        rc = requested_countries
> +    else:
> +        rc = [clientCountry]
> +    clist = get_same_continent_countries(clientCountry, rc)
> +    return do_countrylist(kwargs, cache, clientCountry, clist, header)
> +
> +
> +def do_country(kwargs, cache, clientCountry, requested_countries, header):
> +    if 'GLOBAL' in requested_countries:
> +        return do_global(kwargs, cache, clientCountry, header)
> +    return do_countrylist(
> +        kwargs, cache, clientCountry, requested_countries, header)
> +
> +
> +def do_netblocks(kwargs, cache, header):
> +    hostresults = set()
> +    if not kwargs.has_key('netblock') or kwargs['netblock'] == "1":
> +        tree_results = tree_lookup(host_netblocks_tree, kwargs['IP'],
> 'hosts')
> +        for (prefix, hostids) in tree_results:
> +            for hostid in hostids:
> +                if hostid in cache['byHostId']:
> +                    hostresults.add((prefix, hostid,))
> +                    header += 'Using preferred netblock '
> +    return (header, hostresults)
> +
> +
> +def do_internet2(kwargs, cache, clientCountry, header):
> +    hostresults = set()
> +    ip = kwargs['IP']
> +    if ip is None:
> +        return (header, hostresults)
> +    asn = lookup_ip_asn(internet2_tree, ip)
> +    if asn is not None:
> +        header += 'Using Internet2 '
> +        if clientCountry is not None \
> +                and clientCountry in cache['byCountryInternet2']:
> +            hostresults = cache['byCountryInternet2'][clientCountry]
> +            hostresults = trim_by_client_country(hostresults, clientCountry)
> +    return (header, hostresults)
> +
> +
> +def do_asn(kwargs, cache, header):
> +    hostresults = set()
> +    ip = kwargs['IP']
> +    if ip is None:
> +        return (header, hostresults)
> +    asn = lookup_ip_asn(global_tree, ip)
> +    if asn is not None and asn in asn_host_cache:
> +        for hostid in asn_host_cache[asn]:
> +            if hostid in cache['byHostId']:
> +                hostresults.add(hostid)
> +                header += 'Using ASN %s ' % asn
> +    return (header, hostresults)
> +
> +
> +def do_geoip(kwargs, cache, clientCountry, header):
> +    hostresults = set()
> +    if clientCountry is not None and clientCountry in cache['byCountry']:
> +        hostresults = cache['byCountry'][clientCountry]
> +        header += 'country = %s ' % clientCountry
> +        hostresults = trim_by_client_country(hostresults, clientCountry)
> +    return (header, hostresults)
> +
> +
> +def do_location(kwargs, header):
> +    hostresults = set()
> +    if 'location' in kwargs and kwargs['location'] in location_cache:
> +        hostresults = set(location_cache[kwargs['location']])
> +        header += "Using location %s " % kwargs['location']
> +    return (header, hostresults)
> +
> +
> +def append_path(hosts, cache, file, pathIsDirectory=False):
> +    """ given a list of hosts, return a list of objects:
> +    [(hostid, [hcurls]), ... ]
> +    in the same order, appending file if it's not None"""
> +    subpath = None
> +    results = []
> +    if 'subpath' in cache:
> +        subpath = cache['subpath']
> +    for hostid in hosts:
> +        hcurls = []
> +        for hcurl_id in cache['byHostId'][hostid]:
> +            s = hcurl_cache[hcurl_id]
> +            if subpath is not None:
> +                s += "/" + subpath
> +            if file is None and pathIsDirectory:
> +                s += "/"
> +            if file is not None:
> +                if not s.endswith('/'):
> +                    s += "/"
> +                s += file
> +            hcurls.append(s)
> +        results.append((hostid, hcurls))
> +    return results
> +
> +
> +def trim_to_preferred_protocols(hosts_and_urls):
> +    """ remove all but http and ftp URLs,
> +    and if both http and ftp are offered,
> +    leave only http. Return [(hostid, url), ...] """
> +    results = []
> +    try_protocols = ('https', 'http', 'ftp')
> +    for (hostid, hcurls) in hosts_and_urls:
> +        protocols = {}
> +        url = None
> +        for hcurl in hcurls:
> +            for p in try_protocols:
> +                if hcurl.startswith(p+':'):
> +                    protocols[p] = hcurl
> +
> +        for p in try_protocols:
> +            if p in protocols:
> +                url = protocols[p]
> +                break
> +
> +        if url is not None:
> +            results.append((hostid, url))
> +    return results
> +
> +
> +def client_ip_to_country(ip):
> +    clientCountry = None
> +    if ip is None:
> +        return None
> +
> +    # lookup in the cache first
> +    tree_results = tree_lookup(
> +        netblock_country_tree, ip, 'country', maxResults=1)
> +    if len(tree_results) > 0:
> +        (prefix, clientCountry) = tree_results[0]
> +        return clientCountry
> +
> +    # attempt IPv6, then IPv6 6to4 as IPv4, then Teredo, then IPv4
> +    try:
> +        if ip.version() == 6:
> +            if gipv6 is not None:
> +                clientCountry = gipv6.country_code_by_addr_v6(
> +                    ip.strNormal())
> +            if clientCountry is None:
> +                # Try the IPv6-to-IPv4 translation schemes
> +                for scheme in (convert_6to4_v4, convert_teredo_v4):
> +                    result = scheme(ip)
> +                    if result is not None:
> +                        ip = result
> +                        break
> +        if ip.version() == 4 and gipv4 is not None:
> +            clientCountry = gipv4.country_code_by_addr(ip.strNormal())
> +    except:
> +        pass
> +    return clientCountry
> +
> +
> +def do_mirrorlist(kwargs):
> +    global debug
> +    global logfile
> +
> +    def return_error(kwargs, message='', returncode=200):
> +        d = dict(
> +            returncode=returncode,
> +            message=message,
> +            resulttype='mirrorlist',
> +            results=[])
> +        if 'metalink' in kwargs and kwargs['metalink']:
> +            d['resulttype'] = 'metalink'
> +            d['results'] = metalink_failuredoc(message)
> +        return d
> +
> +    if not (kwargs.has_key('repo') \
> +            and kwargs.has_key('arch')) \
> +            and not kwargs.has_key('path'):
> +        return return_error(
> +            kwargs,
> +            message='# either path=, or repo= and arch= must be specified')
> +
> +    file = None
> +    cache = None
> +    pathIsDirectory = False
> +    if kwargs.has_key('path'):
> +        path = kwargs['path'].strip('/')
> +
> +    # Strip duplicate "//" from the path
> +        path = path.replace('//', '/')
> +
> +        header = "# path = %s " % (path)
> +
> +        sdir = path.split('/')
> +        try:
> +            # path was to a directory
> +            cache = mirrorlist_cache['/'.join(sdir)]
> +            pathIsDirectory=True
> +        except KeyError:
> +            # path was to a file, try its directory
> +            file = sdir[-1]
> +            sdir = sdir[:-1]
> +            try:
> +                cache = mirrorlist_cache['/'.join(sdir)]
> +            except KeyError:
> +                return return_error(
> +                    kwargs, message=header + 'error: invalid path')
> +        dir = '/'.join(sdir)
> +    else:
> +        if u'source' in kwargs['repo']:
> +            kwargs['arch'] = u'source'
> +        repo = repo_redirect.get(kwargs['repo'], kwargs['repo'])
> +        arch = kwargs['arch']
> +        header = "# repo = %s arch = %s " % (repo, arch)
> +
> +        if repo in disabled_repositories:
> +            return return_error(kwargs, message=header + 'repo disabled')
> +        try:
> +            dir = repo_arch_to_directoryname[(repo, arch)]
> +            if 'metalink' in kwargs and kwargs['metalink']:
> +                dir += '/repodata'
> +                file = 'repomd.xml'
> +            else:
> +                pathIsDirectory=True
> +            cache = mirrorlist_cache[dir]
> +        except KeyError:
> +            repos = repo_arch_to_directoryname.keys()
> +            repos.sort()
> +            repo_information = header + "error: invalid repo or arch\n"
> +            repo_information += "# following repositories are available:\n"
> +            for i in repos:
> +                if i[0] is not None and i[1] is not None:
> +                    repo_information += "# repo=%s&arch=%s\n" % i
> +            return return_error(kwargs, message=repo_information)
> +
> +    # set kwargs['IP'] exactly once
> +    try:
> +        kwargs['IP'] = IP(kwargs['client_ip'])
> +    except:
> +        kwargs['IP'] = None
> +
> +    ordered_mirrorlist = cache.get(
> +        'ordered_mirrorlist', default_ordered_mirrorlist)
> +    done = 0
> +    location_results = set()
> +    netblock_results = set()
> +    asn_results = set()
> +    internet2_results = set()
> +    country_results = set()
> +    geoip_results = set()
> +    continent_results = set()
> +    global_results = set()
> +
> +    header, location_results = do_location(kwargs, header)
> +
> +    requested_countries = []
> +    if kwargs.has_key('country'):
> +        requested_countries = uniqueify(
> +            [c.upper() for c in kwargs['country'].split(',') ])
> +
> +    # if they specify a country, don't use netblocks or ASN
> +    if not 'country' in kwargs:
> +        header, netblock_results = do_netblocks(kwargs, cache, header)
> +        if len(netblock_results) > 0:
> +            if not ordered_mirrorlist:
> +                done=1
> +
> +        if not done:
> +            header, asn_results = do_asn(kwargs, cache, header)
> +            if len(asn_results) + len(netblock_results) >= 3:
> +                if not ordered_mirrorlist:
> +                    done = 1
> +
> +    clientCountry = client_ip_to_country(kwargs['IP'])
> +
> +    if clientCountry is None:
> +        print_client_country = "N/A"
> +    else:
> +        print_client_country = clientCountry
> +
> +    if debug and kwargs.has_key('repo') and kwargs.has_key('arch'):
> +        msg = "IP: %s; DATE: %s; COUNTRY: %s; REPO: %s; ARCH: %s\n"  % (
> +            (kwargs['IP'] or 'None'), time.strftime("%Y-%m-%d"),
> +            print_client_country, kwargs['repo'], kwargs['arch'])
> +
> +        sys.stdout.write(msg)
> +        sys.stdout.flush()
> +
> +        if logfile is not None:
> +            logfile.write(msg)
> +            logfile.flush()
> +
> +    if not done:
> +        header, internet2_results = do_internet2(
> +            kwargs, cache, clientCountry, header)
> +        if len(internet2_results) + len(netblock_results) + len(asn_results)
> >= 3:
> +            if not ordered_mirrorlist:
> +                done = 1
> +
> +    if not done and 'country' in kwargs:
> +        header, country_results  = do_country(
> +            kwargs, cache, clientCountry, requested_countries, header)
> +        if len(country_results) == 0:
> +            header, continent_results = do_continent(
> +                kwargs, cache, clientCountry, requested_countries, header)
> +        done = 1
> +
> +    if not done:
> +        header, geoip_results = do_geoip(
> +            kwargs, cache, clientCountry, header)
> +        if len(geoip_results) >= 3:
> +            if not ordered_mirrorlist:
> +                done = 1
> +
> +    if not done:
> +        header, continent_results = do_continent(
> +            kwargs, cache, clientCountry, [], header)
> +        if len(geoip_results) + len(continent_results) >= 3:
> +            done = 1
> +
> +    if not done:
> +        header, global_results = do_global(
> +            kwargs, cache, clientCountry, header)
> +
> +    def _random_shuffle(s):
> +        l = list(s)
> +        random.shuffle(l)
> +        return l
> +
> +    def _ordered_netblocks(s):
> +        def ipy_len(t):
> +            (prefix, hostid) = t
> +            return IP(prefix).len()
> +        v4_netblocks = []
> +        v6_netblocks = []
> +        for (prefix, hostid) in s:
> +            ip = IP(prefix)
> +            if ip.version() == 4:
> +                v4_netblocks.append((prefix, hostid))
> +            elif ip.version() == 6:
> +                v6_netblocks.append((prefix, hostid))
> +        # mix up the order, as sort will preserve same-key ordering
> +        random.shuffle(v4_netblocks)
> +        v4_netblocks.sort(key=ipy_len)
> +        random.shuffle(v6_netblocks)
> +        v6_netblocks.sort(key=ipy_len)
> +        v4_netblocks = [t[1] for t in v4_netblocks]
> +        v6_netblocks = [t[1] for t in v6_netblocks]
> +        return v6_netblocks + v4_netblocks
> +
> +    def whereismymirror(result_sets):
> +        return_string = 'None'
> +        allhosts = []
> +        found = False
> +        for (l,s,f) in result_sets:
> +            if len(l) > 0:
> +                allhosts.extend(f(l))
> +                if not found:
> +                    return_string = s
> +                    found = True
> +
> +        allhosts = uniqueify(allhosts)
> +        return allhosts, return_string
> +
> +    result_sets = [
> +        (location_results, "location", _random_shuffle),
> +        (netblock_results, "netblocks", _ordered_netblocks),
> +        (asn_results, "asn", _random_shuffle),
> +        (internet2_results, "I2", _random_shuffle),
> +        (country_results, "country", shuffle),
> +        (geoip_results, "geoip", shuffle),
> +        (continent_results, "continent", shuffle),
> +        (global_results, "global", shuffle),
> +        ]
> +
> +    allhosts, where_string = whereismymirror(result_sets)
> +    try:
> +        ip_str = kwargs['IP'].strNormal()
> +    except:
> +        ip_str = 'Unknown IP'
> +    log_string = "mirrorlist: %s found its best mirror from %s" % (
> +        ip_str, where_string)
> +    syslogger.info(log_string)
> +
> +    hosts_and_urls = append_path(
> +        allhosts, cache, file, pathIsDirectory=pathIsDirectory)
> +
> +    if 'metalink' in kwargs and kwargs['metalink']:
> +        (resulttype, returncode, results)=metalink(
> +            cache, dir, file, hosts_and_urls)
> +        d = dict(
> +            message=None,
> +            resulttype=resulttype,
> +            returncode=returncode,
> +            results=results)
> +        return d
> +
> +    else:
> +        host_url_list = trim_to_preferred_protocols(hosts_and_urls)
> +        d = dict(
> +            message=header,
> +            resulttype='mirrorlist',
> +            returncode=200,
> +            results=host_url_list)
> +        return d
> +
> +
> +def setup_cache_tree(cache, field):
> +    tree = radix.Radix()
> +    for k, v in cache.iteritems():
> +        node = tree.add(k.strNormal())
> +        node.data[field] = v
> +    return tree
> +
> +
> +def setup_netblocks(netblocks_file, asns_wanted=None):
> +    tree = radix.Radix()
> +    if netblocks_file is not None:
> +        try:
> +            f = open(netblocks_file, 'r')
> +        except:
> +            return tree
> +        for l in f:
> +            try:
> +                s = l.split()
> +                start, mask = s[0].split('/')
> +                mask = int(mask)
> +                if mask == 0: continue
> +                asn = int(s[1])
> +                if asns_wanted is None or asn in asns_wanted:
> +                    node = tree.add(s[0])
> +                    node.data['asn'] = asn
> +            except:
> +                pass
> +        f.close()
> +    return tree
> +
> +
> +def read_caches():
> +    global mirrorlist_cache
> +    global host_netblock_cache
> +    global host_country_allowed_cache
> +    global host_max_connections_cache
> +    global repo_arch_to_directoryname
> +    global repo_redirect
> +    global country_continent_redirect_cache
> +    global disabled_repositories
> +    global host_bandwidth_cache
> +    global host_country_cache
> +    global file_details_cache
> +    global hcurl_cache
> +    global asn_host_cache
> +    global location_cache
> +    global netblock_country_cache
> +
> +    data = {}
> +    try:
> +        f = open(cachefile, 'r')
> +        data = pickle.load(f)
> +        f.close()
> +    except:
> +        pass
> +
> +    if 'mirrorlist_cache' in data:
> +        mirrorlist_cache = data['mirrorlist_cache']
> +    if 'host_netblock_cache' in data:
> +        host_netblock_cache = data['host_netblock_cache']
> +    if 'host_country_allowed_cache' in data:
> +        host_country_allowed_cache = data['host_country_allowed_cache']
> +    if 'repo_arch_to_directoryname' in data:
> +        repo_arch_to_directoryname = data['repo_arch_to_directoryname']
> +    if 'repo_redirect_cache' in data:
> +        repo_redirect = data['repo_redirect_cache']
> +    if 'country_continent_redirect_cache' in data:
> +        country_continent_redirect_cache = data[
> +            'country_continent_redirect_cache']
> +    if 'disabled_repositories' in data:
> +        disabled_repositories = data['disabled_repositories']
> +    if 'host_bandwidth_cache' in data:
> +        host_bandwidth_cache = data['host_bandwidth_cache']
> +    if 'host_country_cache' in data:
> +        host_country_cache = data['host_country_cache']
> +    if 'file_details_cache' in data:
> +        file_details_cache = data['file_details_cache']
> +    if 'hcurl_cache' in data:
> +        hcurl_cache = data['hcurl_cache']
> +    if 'asn_host_cache' in data:
> +        asn_host_cache = data['asn_host_cache']
> +    if 'location_cache' in data:
> +        location_cache = data['location_cache']
> +    if 'netblock_country_cache' in data:
> +        netblock_country_cache = data['netblock_country_cache']
> +    if 'host_max_connections_cache' in data:
> +        host_max_connections_cache = data['host_max_connections_cache']
> +
> +    setup_continents()
> +    global internet2_tree
> +    global global_tree
> +    global host_netblocks_tree
> +    global netblock_country_tree
> +
> +    internet2_tree = setup_netblocks(internet2_netblocks_file)
> +    global_tree    = setup_netblocks(global_netblocks_file, asn_host_cache)
> +    # host_netblocks_tree key is a netblock, value is a list of host IDs
> +    host_netblocks_tree = setup_cache_tree(host_netblock_cache, 'hosts')
> +    # netblock_country_tree key is a netblock, value is a single country
> string
> +    netblock_country_tree = setup_cache_tree(
> +        netblock_country_cache, 'country')
> +
> +
> +def errordoc(metalink, message):
> +    if metalink:
> +        doc = metalink_failuredoc(message)
> +    else:
> +        doc = message
> +    return doc
> +
> +
> +class MirrorlistHandler(StreamRequestHandler):
> +    def handle(self):
> +        signal.signal(signal.SIGHUP, signal.SIG_IGN)
> +        random.seed()
> +        try:
> +            # read size of incoming pickle
> +            readlen = 0
> +            size = ''
> +            while readlen < 10:
> +                size += self.rfile.read(10 - readlen)
> +                readlen = len(size)
> +            size = atoi(size)
> +
> +            # read the pickle
> +            readlen = 0
> +            p = ''
> +            while readlen < size:
> +                p += self.rfile.read(size - readlen)
> +                readlen = len(p)
> +            d = pickle.loads(p)
> +            self.connection.shutdown(socket.SHUT_RD)
> +        except:
> +            pass
> +
> +        try:
> +            try:
> +                r = do_mirrorlist(d)
> +            except:
> +                raise
> +            message = r['message']
> +            results = r['results']
> +            resulttype = r['resulttype']
> +            returncode = r['returncode']
> +        except Exception, e:
> +            message=u'# Bad Request %s\n# %s' % (e, d)
> +            exception_msg = traceback.format_exc(e)
> +            sys.stderr.write(message+'\n')
> +            sys.stderr.write(exception_msg)
> +            sys.stderr.flush()
> +            returncode = 400
> +            results = []
> +            resulttype = 'mirrorlist'
> +            if d['metalink']:
> +                resulttype = 'metalink'
> +                results = errordoc(d['metalink'], message)
> +
> +        try:
> +            p = pickle.dumps({
> +                'message':message,
> +                'resulttype':resulttype,
> +                'results':results,
> +                'returncode':returncode})
> +            self.connection.sendall(zfill('%s' % len(p), 10))
> +
> +            self.connection.sendall(p)
> +            self.connection.shutdown(socket.SHUT_WR)
> +        except:
> +            pass
> +
> +
> +def sighup_handler(signum, frame):
> +    global logfile
> +    if logfile is not None:
> +        name = logfile.name
> +        logfile.close()
> +        logfile = open(name, 'a')
> +
> +    # put this in a separate thread so it doesn't block clients
> +    if threading.active_count() < 2:
> +        thread = threading.Thread(target=load_databases_and_caches)
> +        thread.daemon = False
> +        try:
> +            thread.start()
> +        except KeyError:
> +        # bug fix for handing an exception when unable to delete from
> +        #_limbo even though it's not in limbo
> +        #
> https://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/dist27/threading.py?r=327
> +            pass
> +
> +
> +def sigterm_handler(signum, frame):
> +    global must_die
> +    signal.signal(signal.SIGHUP, signal.SIG_IGN)
> +    signal.signal(signal.SIGTERM, signal.SIG_IGN)
> +    if signum == signal.SIGTERM:
> +        must_die = True
> +
> +
> +class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer):
> +    request_queue_size = 300
> +    def finish_request(self, request, client_address):
> +        signal.signal(signal.SIGHUP, signal.SIG_IGN)
> +        BaseServer.finish_request(self, request, client_address)
> +
> +
> +def parse_args():
> +    global cachefile
> +    global socketfile
> +    global internet2_netblocks_file
> +    global global_netblocks_file
> +    global debug
> +    global logfile
> +    global pidfile
> +    opts, args = getopt.getopt(
> +        sys.argv[1:], "c:i:g:p:s:dl:",
> +        [
> +            "cache", "internet2_netblocks", "global_netblocks",
> +            "pidfile", "socket", "debug", "log="
> +        ]
> +    )
> +    for option, argument in opts:
> +        if option in ("-c", "--cache"):
> +            cachefile = argument
> +        if option in ("-i", "--internet2_netblocks"):
> +            internet2_netblocks_file = argument
> +        if option in ("-g", "--global_netblocks"):
> +            global_netblocks_file = argument
> +        if option in ("-s", "--socket"):
> +            socketfile = argument
> +        if option in ("-p", "--pidfile"):
> +            pidfile = argument
> +        if option in ("-l", "--log"):
> +            try:
> +                logfile = open(argument, 'a')
> +            except:
> +                logfile = None
> +        if option in ("-d", "--debug"):
> +            debug = True
> +
> +
> +def open_geoip_databases():
> +    global gipv4
> +    global gipv6
> +    try:
> +        gipv4 = GeoIP.open(
> +            "/usr/share/GeoIP/GeoIP.dat", GeoIP.GEOIP_STANDARD)
> +    except:
> +        gipv4=None
> +    try:
> +        gipv6 = GeoIP.open(
> +            "/usr/share/GeoIP/GeoIPv6.dat", GeoIP.GEOIP_STANDARD)
> +    except:
> +        gipv6=None
> +
> +
> +def convert_6to4_v4(ip):
> +    all_6to4 = IP('2002::/16')
> +    if ip.version() != 6 or ip not in all_6to4:
> +        return None
> +    parts=ip.strNormal().split(':')
> +
> +    ab = int(parts[1],16)
> +    a = (ab >> 8) & 0xFF
> +    b = ab & 0xFF
> +    cd = int(parts[2],16)
> +    c = (cd >> 8) & 0xFF
> +    d = cd & 0xFF
> +
> +    v4addr = '%d.%d.%d.%d' % (a,b,c,d)
> +    return IP(v4addr)
> +
> +
> +def convert_teredo_v4(ip):
> +    teredo_std = IP('2001::/32')
> +    teredo_xp  = IP('3FFE:831F::/32')
> +    if ip.version() != 6 or (ip not in teredo_std and ip not in teredo_xp):
> +        return None
> +    parts=ip.strNormal().split(':')
> +
> +    ab = int(parts[6],16)
> +    a = ((ab >> 8) & 0xFF) ^ 0xFF
> +    b = (ab & 0xFF) ^ 0xFF
> +    cd = int(parts[7],16)
> +    c = ((cd >> 8) & 0xFF) ^ 0xFF
> +    d = (cd & 0xFF) ^ 0xFF
> +
> +    v4addr = '%d.%d.%d.%d' % (a,b,c,d)
> +    return IP(v4addr)
> +
> +
> +def load_databases_and_caches(*args, **kwargs):
> +    sys.stderr.write("load_databases_and_caches...")
> +    sys.stderr.flush()
> +    open_geoip_databases()
> +    read_caches()
> +    sys.stderr.write("done.\n")
> +    sys.stderr.flush()
> +
> +
> +def remove_pidfile(pidfile):
> +    os.unlink(pidfile)
> +
> +
> +def create_pidfile_dir(pidfile):
> +    piddir = os.path.dirname(pidfile)
> +    if not piddir:
> +        return
> +    try:
> +        os.makedirs(piddir, mode=0755)
> +    except OSError, err:
> +        if err.errno == 17: # File exists
> +            pass
> +        else:
> +            raise
> +    except:
> +        raise
> +
> +
> +def write_pidfile(pidfile, pid):
> +    create_pidfile_dir(pidfile)
> +    f = open(pidfile, 'w')
> +    f.write(str(pid)+'\n')
> +    f.close()
> +    return 0
> +
> +
> +def manage_pidfile(pidfile):
> +    """returns 1 if another process is running that is named in pidfile,
> +    otherwise creates/writes pidfile and returns 0."""
> +    pid = os.getpid()
> +    try:
> +        f = open(pidfile, 'r')
> +    except IOError, err:
> +        if err.errno == 2: # No such file or directory
> +            return write_pidfile(pidfile, pid)
> +        return 1
> +
> +    oldpid=f.read()
> +    f.close()
> +
> +    # is the oldpid process still running?
> +    try:
> +        os.kill(int(oldpid), 0)
> +    except ValueError: # malformed oldpid
> +        return write_pidfile(pidfile, pid)
> +    except OSError, err:
> +        if err.errno == 3: # No such process
> +            return write_pidfile(pidfile, pid)
> +    return 1
> +
> +
> +def main():
> +    global logfile
> +    global pidfile
> +    signal.signal(signal.SIGHUP, signal.SIG_IGN)
> +    parse_args()
> +    manage_pidfile(pidfile)
> +
> +    oldumask = os.umask(0)
> +    try:
> +        os.unlink(socketfile)
> +    except:
> +        pass
> +
> +    load_databases_and_caches()
> +    signal.signal(signal.SIGHUP, sighup_handler)
> +    # restart interrupted syscalls like select
> +    signal.siginterrupt(signal.SIGHUP, False)
> +    ss = ForkingUnixStreamServer(socketfile, MirrorlistHandler)
> +
> +    while not must_die:
> +        try:
> +            ss.serve_forever()
> +        except select.error:
> +            pass
> +
> +    try:
> +        os.unlink(socketfile)
> +    except:
> +        pass
> +
> +    if logfile is not None:
> +        try:
> +            logfile.close()
> +        except:
> +            pass
> +
> +    remove_pidfile(pidfile)
> +    return 0
> +
> +
> +if __name__ == "__main__":
> +    try:
> +        sys.exit(main())
> +    except KeyboardInterrupt:
> +        sys.exit(-1)
> 
> _______________________________________________
> infrastructure mailing list
> infrastructure@xxxxxxxxxxxxxxxxxxxxxxx
> http://lists.fedoraproject.org/admin/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx
> 
_______________________________________________
infrastructure mailing list
infrastructure@xxxxxxxxxxxxxxxxxxxxxxx
http://lists.fedoraproject.org/admin/infrastructure@xxxxxxxxxxxxxxxxxxxxxxx



[Index of Archives]     [Fedora Development]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Yosemite News]     [KDE Users]

  Powered by Linux