Python第三方模块IPy介绍

Python第三方模块IPy介绍

安装

pip install ipy

IP地址、网段的基本处理

IPy模块包含IP类,使用它可以方便处理绝大部分格式为IPv6及IPv4的网络和地址。
比如通过version方法就可以区分出IPv4与IPv6,如:

>>> import IPy
>>> IPy.IP("10.0.0.0/8").version()
4
>>> IPy.IP("::").version()
6
>>>

通过指定的网段输出该网段的IP个数及所有IP地址清单

>>> ip = IPy.IP('192.168.10.0/24')
>>> print(ip.len())
256
>>> for x in ip:
...     print(x)
...
192.168.10.0
192.168.10.1
192.168.10.2
192.168.10.3
192.168.10.4
192.168.10.5
192.168.10.6
192.168.10.7
192.168.10.8
...
192.168.10.255

下面介绍IP类几个常见的方法,包括反向解析名称、IP类型、IP转换等。

>>> from IPy import IP
>>> ip = IP('192.168.1.20')
>>> ip.reverseNames() #反向解析地址格式
['20.1.168.192.in-addr.arpa.']
>>> ip.iptype() #192.168.1.20为私网类型'PRIVATE'
'PRIVATE'
>>> IP('8.8.8.8').iptype() #8.8.8.8为公网类型
'PUBLIC'
>>> IP('8.8.8.8').int() #转换成整型格式
134744072
>>> IP('8.8.8.8').strHex()#转换成十六制格式
'0x8080808'
>>> IP('8.8.8.8').strBin()#转换成二进制格式
'00001000000010000000100000001000'
>>> print(IP(0x8080808))#十六进制转成IP格式
8.8.8.8
>>>

IP方法也支持网络地址的转换,例如根据IP与掩码生产网段格式,如下

>>> from IPy import IP
>>> print(IP('192.168.1.0').make_net('255.255.255.0'))
192.168.1.0/24
>>> print(IP('192.168.1.0/255.255.255.0',make_net=True))
192.168.1.0/24
>>> print(IP('192.168.1.0-192.168.1.255',make_net=True))
192.168.1.0/24

也可以通过strNormal方法指定不同wantprefixlen参数值以定制不同输出类型的网段。
输出类型为字符串,如下:

>>> IP('192.168.1.0/24').strNormal(0)
'192.168.1.0'
>>> IP('192.168.1.0/24').strNormal(1)
'192.168.1.0/24'
>>> IP('192.168.1.0/24').strNormal(2)
'192.168.1.0/255.255.255.0'
>>> IP('192.168.1.0/24').strNormal(3)
'192.168.1.0-192.168.1.255'
>>>

有时候我们想比较两个网段是否存在包含、重叠等关系。IPy支持类似于数据型数据的比较,以帮助IP对象进行比较,如:

P('10.0.0.0/24') < IP('12.0.0.0/24')
True

判断IP地址和网段是否包含于另一个网段中,如:

>>> '192.168.1.100' in IP('192.168.1.0/24')
True
>>> IP('192.168.1.0/24') in IP('192.168.0.0/16')
True

判断两个网段是否存在重叠,采用IPy提供的overlaps方法,如:

>>> IP('192.168.0.0/23').overlaps('192.168.1.0/24')
1#重叠
>>> IP('192.168.1.0/24').overlaps('192.168.2.0')
0#不重叠
>>>

综合示例:根据输入的IP或子网返回网络、掩码、广播、反向解析、子网数、IP类型等信息。

#!/usr/bin/env python

from IPy import IP

ip_s = input("Please input an IP or net-range:")

ips = IP(ip_s)

if len(ips) > 1:
    print("net:%s" % ips.net())
    print("netmast:%s" % ips.netmask())
    print("broadcast:%s" % ips.broadcast())
    print("reverse address:%s" % ips.reverseNames()[0])
    print("subnet:%s" % len(ips))
else:
    print('reverse address:%s' % ips.reverseNames()[0])


print("hexadecimal:%s" % ips.strHex())
print("binary ip:%s" % ips.strBin())
print("iptype:%s" % ips.iptype())

输出结果如下:

python simple1.py
Please input an IP or net-range:192.168.1.0/24
net:192.168.1.0
netmast:255.255.255.0
broadcast:192.168.1.255
reverse address:1.168.192.in-addr.arpa.
subnet:256
hexadecimal:0xc0a80100
binary ip:11000000101010000000000100000000
iptype:PRIVATE
(venv) ➜ mgr python simple1.py
Please input an IP or net-range:192.168.1.20
reverse address:20.1.168.192.in-addr.arpa.
hexadecimal:0xc0a80114
binary ip:11000000101010000000000100010100
iptype:PRIVATE

Python第三方模块dnspython介绍

Python第三方模块dnspython介绍

安装

pip install dnspython

模块域名解析方法介绍

dnspython提供了一个DNS解析器类–resolver,使用它的query方法来实现域名的查询功能。query方法定义如下:

query(self, qname, rdtype=1, rdclass=1, tcp=False, source=None, raise_on_no_answer=True, source_port=0)

其中,qname参数为查询的域名。rdtype参数用来指定RR资源的类型,常用的有以下几种:

  • A记录,将主机名转换成IP地址;
  • MX记录,邮件交换记录,定义邮件服务器的域名;
  • CNAME记录,指别名记录,实现域名间的映射;
  • NS记录,标记区域的域名服务器及授权子域;
  • PTR记录,反向解析,与A记录相反,将IP转换成主机名;
  • SOA记录,SOA标记,一个起始授权区的定义。

rdclass参数用于指定网络类型,可选的值有IN、CH与HS,其中IN为默认,使用最广泛。
tcp参数用于指定查询是否启用TCP协议,默认为False(不启用)。
source与source_port参数作为指定查询源地址与端口,默认值为查询设备IP地址和0。
raise_on_no_answer参数用于指定当查询无应答时是否触发异常,默认为True。

举例

A记录

#!/usr/bin/env python
import dns.resolver

domain = input("please input an domain:")
A = dns.resolver.query(domain,'A')
for i in A.response.answer:
    for j in i.items:
        print(j.address)

测试结果:

(venv) ➜ dnspython python simple1.py
please input an domain:go2live.cn
59.110.85.65

MX记录

#!/usr/bin/env python
import dns.resolver

domain = input("please input an domain:")
MX = dns.resolver.query(domain, 'MX')
for i in MX:
    print('MX preference=%s, mail exchanger=%s' % (i.preference, i.exchange))

测试结果:

(venv) ➜ dnspython python simple2.py
please input an domain:163.com
MX preference=10, mail exchanger=163mx02.mxmail.netease.com.
MX preference=10, mail exchanger=163mx03.mxmail.netease.com.
MX preference=10, mail exchanger=163mx01.mxmail.netease.com.
MX preference=50, mail exchanger=163mx00.mxmail.netease.com.

NS记录

#!/usr/bin/env python
import dns.resolver

domain = input("please input an domain:")
ns = dns.resolver.query(domain, 'NS')
for i in ns.response.answer:
    for j in i.items:
        print(j.to_text())

测试结果:

(venv) ➜ dnspython python simple3.py
please input an domain:baidu.com
ns4.baidu.com.
ns3.baidu.com.
ns7.baidu.com.
dns.baidu.com.
ns2.baidu.com.

需要注意,域名需要是一级域名。

CNAME记录

#!/usr/bin/env python
import dns.resolver

domain = input("please input an domain:")
cname = dns.resolver.query(domain, 'CNAME')
for i in cname.response.answer:
    for j in i.items:
        print(j.to_text())

测试结果:

(venv) ➜ dnspython python simple4.py
please input an domain:img.go2live.cn
iduxqy8.qiniudns.com.

我的网站把图片资源放到了七牛的dns上。。后面改到了阿里云。

监控代码

步骤:
1. 通过dns解析获得ip地址列表
2. 通过http请求获取网站服务是否正常。

#!/usr/bin/env python
import dns.resolver
import os
import requests


iplist = []
appdomain = "www.a.shifen.com"


def get_iplist(domain=""):
    try:
        A = dns.resolver.query(domain, 'A')
    except Exception as e:
        print("dns resolver error:", str(e))
        return

    for i in A.response.answer:
        for j in i.items:
            iplist.append(j.address)
    return True


def checkip(ip):
    checkurl = ip+":80"
    try:
        r = requests.get('http://'+checkurl)
        if r.status_code == 200  and '<!doctype html>' in r.text.lower():
            print(ip + " [ok]")
        else:
            print(ip + " [Error] ")
    except Exception as e:
        print(e)


if __name__ == '__main__':
    if get_iplist(appdomain) and len(iplist) > 0:
        for ip in iplist:
            checkip(ip)
    else:
        print("dns resolver error.")

输出结果:

(venv) ➜ dnspython python simple5.py
58.217.200.112 [ok]
58.217.200.113 [ok]

域名不能用baidu.com, 因为baidu.com是www.a.shifen.com的别名。
也说明这个程序并不健壮。

oop设计原则SOLID详解

oop设计原则SOLID详解

简介

面向对象编程设计原则有个总结,叫SOLID原则。
再具体下去就是具体的设计模式了。

S=single Responsibility Principle 单一职责原则

动机

在这里,责任被认为是改变的一个原因。这个原则表明,如果我们有两个原因要修改一个类,我们必须将功能分为两个类。每个类将只处理一个责任,如果将来我们需要做一个更改,我们将在相应的类中处理。 当我们需要在具有更多职责的类中进行更改时,更改可能会影响与类的其他职责相关的其他功能。

单一职责原则是一个简单和直观的原则,但在实践中有时很难正确运用。

意图

一个类,应该仅有一个引起它变化的原因。

举例

让我们假设我们需要一个对象来保存电子邮件。我们将使用从下面的示例中的IEmail接口。看起来没啥问题。但仔细看看,我们可以看到,我们的IEmail接口和Email类有2个责任(改变的原因)。
一个是在一些电子邮件协议如pop3或imap中使用该类。如果必须支持其他协议,则应以另一种方式对对象进行序列化,并且应添加代码以支持新协议。
另一个是Content字段,即使内容是字符串,也许我们将来可能支持HTML或其他格式。

如果我们只保留一个类,每一个职责的改变可能会影响到另一个:

  • 添加新协议将创建需要为每个类型的字段添加用于解析和序列化内容的代码。
  • 添加新的内容类型(如html)使我们为每个协议添加代码。

//单一责任原则 - 错误示例

interface IEmail { 
    public void setSender(String sender); 
    public void setReceiver(String receiver); 
    public void setContent(String content); 
} 

class Email implements IEmail { 
    public void setSender(String sender){// set sender; } 
    public void setReceiver(String receiver){//设置接收器; } 
    public void setContent(String content){//设置内容; } 
}

我们可以创建一个新的接口和类,称为IContent和Content来分担责任。每个类只有一个责任给我们更灵活的设计:

  • 添加新协议只会导致电子邮件类中的更改。
  • 添加新类型的内容支持导致仅在Content类中的更改。
//单一责任原则 - 好的示例
interface IEmail { 
    public void setSender(String sender); 
    public void setReceiver(String receiver); 
    public void setContent(IContent content); 
} 

interface Content { 
    public String getAsString(); //用于序列化
} 

class Email implements IEmail { 
    public void setSender(String sender){// set sender; } 
    public void setReceiver(String receiver){//设置接收器; } 
    public void setContent(IContent content){//设置内容; } 
}

结论

单一责任原则代表在应用程序的设计阶段识别类的一种好方法,它提醒您想一个类可以发展的所有方式。只有当应用程序应该如何工作的全部情况都很好理解时,才能实现良好的责任分离。

O=open close 开闭原则

开放-封闭原则,是说软件实例(类、模块、函数等等)应该可以扩展,但是不可修改。

这个原则其实是有两个特征,一个是说‘对于扩展是开放的(open for extension)’,另一个是说‘对于更改是封闭的(Closed for modification)’

动机

聪明的应用程序设计和代码实现应该考虑到在应用程序的开发和维护阶段会有频繁的更改。通常,当向应用程序添加新功能时,会涉及到许多更改。应该将现有代码中的这些更改最小化,因为假设现有代码已经进行单元测试,并且已编写代码中的更改可能以不必要的方式影响现有功能。

开闭原则指出该代码的设计和实现,应该能让新功能的加入以现有的代码最小的变化来完成。设计应该允许添加新功能作为新类,尽可能保持现有代码不变。

意图

像类,模块和函数的软件实体应该是开放的扩展,但关闭修改

例子

下面是违反开放关闭原则的示例。它实现了一个图形编辑器处理不同形状的绘图。很明显,它不遵循开放关闭原则,因为GraphicEditor类必须为每个必须添加的新形状类进行修改。有几个缺点:

  • 对于每个新形状添加的GraphicEditor的单元测试应该重做。
  • 当添加新类型的形状时,添加它的时间将会很长,因为添加它的开发人员应该理解GraphicEditor的逻辑。
  • 添加新形状可能以不期望的方式影响现有功能,即使新形状完美地工作。

// Open-Close Principle  -  Bad example 
class GraphicEditor { 
 
    public void drawShape(Shape s){ 
        if(s.m_type == 1)
            drawRectangle(s); 
        else if(s.m_type == 2)
            drawCircle(s); 
    } 
    public void drawCircle(Circle r){...} 
    public void drawRectangle(Rectangle r){....} 
} 
 
class Shape { 
    int m_type; 
} 
 
class Rectangle extends Shape { 
    Rectangle(){ 
        super.m_type = 1; 
    } 
} 
 
class Circle extends Shape { 
    Circle(){ 
        super.m_type = 2; 
    } 
}

下面是支持开放关闭原则的示例。在新设计中,我们在GraphicEditor中使用抽象draw()方法绘制对象,同时在具体形状对象中来实现。使用开放关闭原则避免了先前设计的问题,因为在添加新的形状类时不会更改GraphicEditor:

  • 无需单元测试。
  • 无需了解GraphicEditor的源代码。
  • 因为绘图代码被移动到具体的形状类,当添加新的功能时,降低了影响旧功能的风险。

// Open-Close Principle  -  Good example 
class GraphicEditor { 
    public void drawShape(Shape s){ 
        s.draw(); 
    } 
} 
 
class Shape { 
    abstract void draw(); 
} 
 
class Rectangle extends Shape { 
    public void draw(){ 
        //绘制矩形
    } 
}

结论

OCP只是一个原则。做出灵活的设计需要花费额外的时间和精力,并且它引入了新的抽象层次,增加了代码的复杂性。因此,这个原则应该应用于最有可能改变的领域。

有许多设计模式可以帮助我们扩展代码而不改变它。例如装饰者模式帮助我们遵循开放原则。此外,工厂方法或观察者模式可用于设计一个易于随现有代码中的最小变化而改变的应用程序。

L=LSP(Liskov’s Substitution Principle) 里氏代换原则

子类型必须能够替换它们的父类型
一个软件实体如果使用的是一个父类的话,那么一定适用于其子类,而且它察觉不出父类对象和子类对象的区别。也就是说,在软件里面,把父类都替换成它的子类,程序的行为没有变化。

动机

一直以来,我们在设计一个程序模块,我们创建一些类层次结构。然后我们通过扩展一些类来创建一些派生类。

我们必须确保新的派生类只是扩展而不替换旧类的功能。否则,新类在现有程序模块中使用时会产生不良影响。

Likov的替换原则声明,如果程序模块使用Base类,那么对Base类的引用可以替换为Derived类,而不会影响程序模块的功能。

意图

派生类型必须能够完全替代其基本类型。

例子

下面是违反Likov替代原则的典型例子。在示例中使用2个类:Rectangle和Square。让我们假设Rectangle对象在应用程序中的某个地方使用。我们扩展应用程序并添加Square类。正方形类由工厂模式返回,基于一些条件,我们不知道将返回什么类型的对象。但我们知道这是一个矩形。我们得到矩形对象,宽度设置为5,高度设置为10,得到面积。对于宽度为5和高度为10的矩形,面积应为50,而结果却是100。



//违反Likov的替代原则
class Rectangle
{
    protected int m_width;
    protected int m_height;

    public void setWidth(int width){
        m_width = width;
    }}

    public void setHeight(int height){
        m_height = height;
    }}


    public int getWidth(){
        return m_width;
    }}

    public int getHeight(){
        return m_height;
    }}

    public int getArea(){
        return m_width * m_height;
    }}  
}}

class Square extends Rectangle 
{
    public void setWidth(int width){
        m_width = width;
        m_height = width;
    }}

    public void setHeight(int height){
        m_width = height;
        m_height = height;
    }}

}}

class LspTest
{
    private static Rectangle getNewRectangle()
    {
        //它可以是一些工厂返回的对象... 
        return new Square();
    }}

    public static void main(String args [])
    {
        Rectangle r = LspTest.getNewRectangle();
        
        r.setWidth(5);
        r.setHeight(10);
        //用户知道r是一个矩形。 
        //假设他能够为基类设置宽度和高度

        System.out.println(r.getArea());
        //现在他惊讶地发现该面积是100而不是50。
    }}
}}

结论

这个原则只是开放关闭原则的一个扩展,这意味着我们必须确保新的派生类是扩展基类而不改变它们的行为。

I=Interface Segregation Principle 接口隔离原则

动机

当我们设计一个应用程序时,我们应该注意如何使一个包含多个子模块的模块抽象化。考虑由类实现的模块,我们可以在接口中完成系统的抽象。但是如果我们想扩展我们的应用程序添加另一个模块,它只包含原始系统的一些子模块,我们被迫实现完整的接口和写一些虚拟方法。这样的接口被命名为fat接口或污染的接口。具有接口污染不是一个好的解决方案,并且可能在系统中引起不适当的行为。

该接口分离原则规定,客户不应该被强迫来实现它们不使用的接口。基于一组方法,每个方法服务一个子模块,而不是一个胖接口许多小接口是优选的。

意图

不应该强制客户端依赖于他们不使用的接口。

例子

下面是一个违反接口隔离原则的例子。我们有一个经理类,代表管理工人的人。我们有两种类型的工人一些平均和一些非常有效的工人。这两种类型的工人工作,他们需要每天吃午饭。但现在有些机器人来到他们工作的公司,但他们不吃,所以他们不需要吃午饭。一个新的机器人类需要实现IWorker接口,因为机器人工作。在另一方面,不必实施它,因为他们不吃。

这就是为什么在这种情况下,IWorker被认为是一个污染的接口。

如果我们保持当前的设计,新的Robot类被迫实现eat方法。我们可以写一个不做任何事情的虚拟类(假设每天吃1秒),并且在应用程序中可能会产生不良效果(例如,管理者看到的报告会报告更多的午餐,而不是人数)。

根据接口分离原则,灵活的设计不会有接口污染。在我们的情况下,IWorker接口应该分为两个不同的接口。


// interface segregation principle - bad example
interface IWorker {
    public void work();
    public void eat();
}

class Worker implements IWorker{
    public void work() {
        // ....working
    }
    public void eat() {
        // ...... eating in launch break
    }
}

class SuperWorker implements IWorker{
    public void work() {
        //.... working much more
    }

    public void eat() {
        //.... eating in launch break
    }
}

class Manager {
    IWorker worker;

    public void setWorker(IWorker w) {
        worker=w;
    }

    public void manage() {
        worker.work();
    }
}

下面是支持接口隔离原则的代码。通过在2个不同的接口中拆分IWorker接口,新的Robot类不再强制实现eat方法。此外,如果我们需要另一个功能的机器人像充电,我们创建另一个有充电方法的接口IRechargeble。


// interface segregation principle - good example
interface IWorker extends Feedable, Workable {
}

interface IWorkable {
    public void work();
}

interface IFeedable{
    public void eat();
}

class Worker implements IWorkable, IFeedable{
    public void work() {
        // ....working
    }

    public void eat() {
        //.... eating in launch break
    }
}

class Robot implements IWorkable{
    public void work() {
        // ....working
    }
}

class SuperWorker implements IWorkable, IFeedable{
    public void work() {
        //.... working much more
    }

    public void eat() {
        //.... eating in launch break
    }
}

class Manager {
    Workable worker;

    public void setWorker(Workable w) {
        worker=w;
    }

    public void manage() {
        worker.work();
    }
}

结论

如果设计已经完成,胖接口可以使用适配器模式进行隔离。
像每个原则一样接口分离原则是一个原则,需要花费额外的时间和精力在设计期间应用它,并增加代码的复杂性。
但它产生灵活的设计。
如果我们要过分地使用应用它,它将导致包含许多只有单个方法接口的代码,因此应该基于经验和常识来识别代码将来可能在哪些该地需要扩展。

D= Dependency依赖反转原则

抽象不应该依赖细节,细节应该依赖于抽象。
说白了,就是要针对接口编程,不要对实现编程

动机

当我们设计软件应用程序时,我们可以考虑低级类实现基本和主要操作(磁盘访问,网络协议,…)的类和高级类封装复杂逻辑(业务流,…)的类。最后一个依赖于低级类。实现这种结构的一种自然方式是编写低级类,一旦我们让它们编写复杂的高级类。由于高级类是根据其他定义的,这似乎是合理的做法。但这不是一个灵活的设计。如果我们需要替换一个低级类会发生什么?
让我们来看一个复制模块的典型例子,它从键盘读取字符并将它们写入打印机设备。包含逻辑的高级类是复制类。低级类是KeyboardReader和PrinterWriter。

在一个糟糕的设计中,高级类直接使用并且在很大程度上依赖于低级类。在这种情况下,如果我们要更改设计以将输出定向到一个新的FileWriter类,我们必须在Copy类中进行更改。(让我们假设它是一个非常复杂的类,有很多逻辑,真的很难测试)。

为了避免这样的问题,我们可以在高级类和低级类之间引入抽象层。由于高级模块包含复杂的逻辑,它们不应该依赖于低级模块,因此不应该基于低级模块来创建新的抽象层。低级模块将基于抽象层创建。

根据这个原则,设计类结构的方法是从高级模块开始到低级模块:
高级类 – >抽象层 – >低级类

意图

  1. 高级模块不应该依赖于低级模块。两者都应该依赖于抽象。
  2. 抽象不应取决于细节。细节应该取决于抽象。

例子

下面是违反依赖反转原则的示例。我们有经理类,它是一个高级类,而低级类称为Worker。我们需要在我们的应用程序中添加一个新模块,以模拟由新的专业工作者雇用决定的公司结构的变化。我们为此创建了一个新类SuperWorker。

让我们假设Manager类相当复杂,包含非常复杂的逻辑。现在我们必须改变它,以引入新的SuperWorker。让我们看看缺点:

  • 我们必须更改Manager类(记住它是一个复杂的,这将涉及时间和精力进行更改)。
  • 某些来自管理器类的当前功能可能会受到影响。
  • 单元测试应重做。

所有这些问题可能需要很多时间来解决,它们可能在旧的功能中引入新的错误。如果应用程序是按照依赖性反转原则设计的,情况会有所不同。这意味着我们设计manager类,一个IWorker接口和实现IWorker接口的Worker类。当我们需要添加SuperWorker类时,我们所要做的就是为其实现IWorker接口。现有类中没有其他更改。

//依赖反转原理 - 错误的示例

class Worker { 

    public void work(){ 

        // .... working 

    } 

} 



class Manager { 

    Worker worker; 



    public void setWorker(Worker w){ 
        worker = w; 
    } 

    public void manage(){ 
        worker.work(); 
    } 
} 

class SuperWorker { 
    public void work(){ 
        // .... working more more 
    } 
}

下面是支持依赖反转原理的代码。在这个新设计中,通过IWorker接口添加了一个新的抽象层。现在来自上面的代码的问题被解决了(考虑到高级逻辑没有变化):

  • 在添加SuperWorkers时,Manager类不需要更改。
  • 最小化风险以影响Manager类中的旧功能,因为我们不更改它。
  • 无需为Manager类重做单元测试。
//依赖反转原则 - 好例子
interface IWorker { 
    public void work(); 
} 

class Manager implements IWorker { 
    public void work(){ 
        // .... working 
    } 
} 

class SuperWorker implements IWorker { 
    public void work(){ 
        // .... working more more 
    } 
} 

class Manager { 
    IWorker worker; 

    public void setWorker(IWorker w){ 
        worker = w; 
    } 

    public void manage(){ 
        worker.work(); 
    } 
}

结论

当应用这个原则时,它意味着高级类不直接与低级类工作,他们使用接口作为抽象层。在这种情况下,不能使用运算符new来在高级类(如果必要)内实例化新的低级对象。相反,可以使用一些Creational设计模式,例如Factory Method,Abstract Factory,Prototype。

模板设计模式是应用DIP原理的示例。

当然,使用这个原则意味着更多的努力,将导致更多的类和接口维护,在一些词语中更复杂的代码,但更灵活。这个原则不应盲目地应用于每个类或每个模块。如果我们有一个更有可能在未来保持不变的类功能,则不需要应用这个原则。

算法分析

算法分析

算法简介

简单来说,所谓算法就是定义良好的计算过程,它取一个或一组值作为输入,并产生出一个或一组值作为输出。亦即,算法就是一系列的计算步骤,用来将输入数据转换成输出结果。

算法分析

算法分析是关于计算机程序性能与资源利用的研究。
计算时间是一种有限的资源,存储空间也是一种有限的资源。
算法尤其关注性能, 这是门关于性能的学问。

循环不变化

循环不变化主要用来帮助我们理解算法的正确性。对于循环不变式,必须证明他的三个性质:

  • 初始化: 它在循环的第一轮迭代开始之前,应该是正确的。
  • 保持: 如果在循环的某一次迭代开始之前它是正确的,那么,在下一次迭代开始之前,它也应该保持正确。
  • 终止:当循环结束时,不变式给了我们一个有用的性质,它有助于表明算法是正确的。

最坏情况和平均情况分析

一般考察算法的最坏情况运行时间。

渐近分析

淅近分析的基本思路是忽略掉那些依赖于机器的常量, 以及,不去检查实际的运行时间,而是关注运行时间的增长。

当n->∞,有 ( \Theta(n^3) < \Theta(n^2) < \Theta(n) < \Theta(lgn) < \Theta(1))
此处<表示消耗时间少,效率更高。

简化分析方法

从极限角度看,我们只关心算法运行时间如何随着输入规模的无限增长而增长。

渐近确界
( \Theta(g(n)) = {f(n): 存在正常数c_1,c_2和n_0, 使对所有的n \geq n_0, 有0 \leq c_1g(n) \leq f(n) \leq c_2g(n) } )
(\Theta(g(n))是一个集合,可以写成f(n) in \Theta(g(n)), 表示f(n)是\Theta(g(n))的元素,不过,通常写成f(n) = \Theta(g(n)) ) 我们说g(n)是f(n)的一个渐近边界。

渐近上界
( O(g(n)) = {f(n): 存在正常数c和n_0, 使对所有的n \geq n_0, 有0 \leq f(n) \leq cg(n) } )

渐近下界
( \Omega(g(n)) = {f(n): 存在正常数c和n_0, 使对所有的n \geq n_0, 有0 \leq cg(n) \leq f(n) } )

o记号
( o(g(n)) = {f(n): 对任意正常数c, 存在n_0 > 0, 使对所有的n \geq n_0, 有0 \leq f(n) \leq cg(n) } )
从直觉上看,在o表示中当n趋于无穷时,函数f(n)相对于g(n)来说不重要了,即
[\lim_{n \rightarrow \infty} \frac{f(n)}{g(n)} = 0]

(\omega记号)
( \omega(g(n)) = {f(n): 对任意正常数c, 存在n_0 > 0, 使对所有的n \geq n_0, 有0 \leq cg(n) \leq f(n) } )
从直觉上看,在o表示中当n趋于无穷时,函数f(n)相对于g(n)来说变得任意大了,即
[\lim_{n \rightarrow \infty} \frac{f(n)}{g(n)} = \infty]

定理

对任意两个函数f(n)和g(n), \(f(n) = \Theta(g(n))当且仅当f(n)=O(g(n)) 和 f(n) = \Omega(g(n))。\)

实际应用中,一般是用渐近上界和渐近下界证明了渐近确界。

等式和不等式中的渐近记号

  • 只出现在等式(不等式)的右边: 表示集合的成员关系。如\(n=O(n^2)\)
  • 一般来说,当出现在某个公式中时, 我们将其解释为一个不在乎其名称的匿名函数。如\(2n^2 + 3n + 1 = 2n^2 + \Theta(n) 即表示2n^2 + 3n + 1 = 2n^2 + f(n), 其中f(n)是某个属于\Theta(n)的函数。\)
  • 有时,出现在等式的左边,例如\(2n^2 + \Theta(n) = \Theta(n^2)\) 这时,我们使用以下规则来解释这种等式:无论等号左边的匿名函数如何选择, 总有办法选取选号右边的匿名函数使等式成立。

递归式的解法

代换法

步骤:

  1. 猜测解的形式。
  2. 用数学归纳法找出使解真正有效的常数。

适用情形:
只能用于解的形式很容易猜的情形。

递归树方法

用代换法不好猜,我们可以用递归树方法获得一个猜想,然后用代换法加以验证。

当用递归式表示分治算法的运行时间时,递归树的方法尤其有用。

主方法

主方法给出求解如下形式的递归式的”食谱”方法:

[ T(n) = aT(n/b) + fn(n)]
其中(a \geq 1 和 b > 1 是常数, f(n)是一个渐近正的函数)。

有三种情况:

  1. \(若对于某常数\varepsilon > 0, 有f(n) = O(n^{log_b{a-\varepsilon}}), 则T(n)=\Theta(n^{log_ba});\)
  2. \(若f(n)=\Theta(n^{log_ba}), 则T(n) = \Theta(n^{log_ba}lgn)\);
  3. \(若对某常数\varepsilon>0, 有f(n)=\Omega(n^{log_b{a+\varepsilon}}), 且对常数c<1与所有足够大的n,\) \(有af(n/b) \leq cf(n), 则T(n)=\Theta(f(n)) \);

需要注意三种情况并没有覆盖所有可能的f(n)。

算法设计

增量方法

插入排序有的就是增量方法。

分治法

分治模式在每一层递归上都有三个步骤:

  • 分解:将原问题分解成一系统子问题。
  • 解决:递归地解决各子问题。若子问题足够小,则直接求解。
  • 合并:将子问题的结果合并成原问题的解。

分治法举例之x的n次方

分治法举例之x的n次方

问题

(求x^n是很简单的事)

1.pow函数

python有pow函数。直接调用

pow(232, 1000)

结果>时间>python xn1.py 0.02s user 0.01s system 89% cpu 0.038 total

2.简单实现

递归

O(n)的时间复杂度

def xn(x, n):
    if n == 0:
        return 1
    elif n == 1:
        return x
    else:
        return x * xn(x, n-1)



print(xn(232, 1000))

时间>RuntimeError: maximum recursion depth exceeded
python xn.py 0.03s user 0.03s system 81% cpu 0.071 total

超了递归深度。。

循环

def xn(x, n):
    if n == 0:
        return 1
    else:
        ret = 1
        while n > 0:
            ret *= x
            n -= 1
        return ret




print(xn(232, 1000))

时间>python xn3.py 0.02s user 0.02s system 89% cpu 0.046 total

3.分治法实现

def xn(x, n):
    if n == 0:
        return 1
    elif n == 1:
        return x
    else:
        if n%2 == 0:
            return xn(x, int(n/2)) * xn(x, int(n/2))
        else:
            return xn(x, int(n/2)) * xn(x, n-int(n/2))



print(xn(232, 1000))

时间>
python xn2.py 0.02s user 0.01s system 83% cpu 0.046 total

实验数据

实验1数据: 232的1000次方。

方法 时间
pow 0.038
一般实现(递归) 0.071超递归栈,无结果
一般实现(循环) 0.046
分治法 0.046

实验2数据: 232的10000次方。

方法 时间
pow 0.056
一般实现(循环) 0.080
分治法 0.070

实验2数据: 232的100000次方。

方法 时间
pow 1.195
一般实现(循环) 3.469
分治法 1.411

结论

  1. 递归实现需要注意到递归栈深度问题。
  2. 能用库函数就尽量用库函数。库函数一般是C实现,实现的比较高效。 没有库函数时,数据量小可以用最简单的方式实现,但数据量大时,一定得想有什么算法适合解决该问题。

算法之斐波那契数

算法之斐波那契数

定义

斐波那契数递归地定义为:
[ F_0 = 0\;F_1 =1\;F_i = F_{i-1} + F_{i-2}\;当i \geq 2]

递归解法

这是根据定义直接求解。以指数形式增长。

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

循环写法,cache结果

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    elif n == 2:
        return 1
    else:
        n2 = 0
        n1 = 1
        r = 1
        while n > 2:
            n2 = n1
            n1 = r
            r = n2 + n1
            n -= 1
    return r

利用数学结果1

事实上,有这么一个数学结论:

[F_i = \frac{\phi^i-\hat\phi^i}{\sqrt{5}}, 其中\phi=\frac{1+\sqrt{5}}{2}, \hat\phi=\frac{1-\sqrt{5}}{2}]

似乎能够利用分治法举例之x的n次方获得解。但仔细想想,这公式里有(\sqrt{5}), 肯定会在计算中丢失精度,而斐波那契数一定是个整数。这玩意并不能真正用程序来实现。

利用数学结果2

还有这么一个数学公式:(可以用归纳法证明)

[ \begin{pmatrix}
F_{n+1} & F_n \
F_n & F_{n-1}
\end{pmatrix} =
\begin{pmatrix}
1 & 1 \
1 & 0
\end{pmatrix}^n
]

而计算一个数的n次方,可以利用分治法举例之x的n次方了,可以知道时间复杂度是(\Theta(logn))

一用了numpy的矩阵相乘,但是在计算到93时出了错误,应该是数字越界问题。重新实现了下。

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1

    class metrix:
        def __init__(self, i1, i2, j1, j2):
            self.i1 = i1
            self.i2 = i2
            self.j1 = j1
            self.j2 = j2

        def mul(self, another):
            i1 = self.i1 * another.i1 + self.i2 * another.j1
            i2 = self.i1 * another.i2 + self.i2 * another.j2
            j1 = self.j1 * another.i1 + self.j2 * another.j1
            j2 = self.j1 * another.i2 + self.j2 * another.j2
            return metrix(i1, i2, j1, j2)

        def __str__(self):
            return "(i1=%ld,i2=%ld,j1=%ld,j2=%ld)" % (
                self.i1, self.i2, self.j1, self.j2)

        @staticmethod
        def mulmetirx(a, b):
            return a.mul(b)

    def metrix_power(x, n):
        if n == 0:
            return 1
        elif n == 1:
            return x
        else:
            return metrix.mulmetirx(metrix_power(x, int(n/2)),
                                    metrix_power(x, n-int(n/2)))
    base = metrix(1, 1, 1, 0)
    result = metrix_power(base, n)
    return result.i2

实验

实现 规模 结果 时间 时间复杂度
递归实现 10 55 0.036 指数级
循环+cache 10 55 0.035 O(n)
矩阵算法 10 55 0.251 O(lgn)
递归实现 100 >83秒后终止 指数级
循环+cache 100 354224848179261915075 0.037 O(n)
矩阵算法 100 354224848179261915075 0.061 O(lgn)
循环+cache 1000 0.036 O(n)
矩阵算法 1000 0.061 O(lgn)
循环+cache 10000 0.069 O(n)
矩阵算法 10000 0.111 O(lgn)
循环+cache 100000 0.279 O(n)
矩阵算法 100000 0.627 O(lgn)
循环+cache 1000000 18.941 O(n)
矩阵算法 1000000 6.586 O(lgn)

结论

最简单的写法简直惨不忍睹,而高效的算法太依赖于数学了。算法和数学真的不分家,有志于玩算法的同学,学好数学吧。
更多的时候,我们要选择的不是最优的,而是最合适的。
譬如在1万以下,O(n)的效率居然最高。直到到了10万,O(lgn)才比O(n)更好。

分治法举例之矩阵乘法

分治法举例之矩阵乘法

前言

矩阵按定义直接实现是比较直接简单的。时间复杂度也可以直接得出来是(O(n^3))

直接实现

class matrix:
    '''
    为了简单起见,没有对数据做校验。
    假设矩阵就是nxn的。
    '''
    def __init__(self, data):
        if not data or not hasattr(data, '__getitem__'):
            raise ValueError("data not valid! %s" % data)
        self.data = data
        self.rows = len(data)
        self.cols = max(map(lambda row: len(row), data))

    def __mul__(self, another):
        if self.cols != another.rows:
            raise ValueError("not valid ddata ,only support mxn * nxp")
        ret = matrix([[0 for _ in range(another.cols)] for _ in range(self.rows)])
        for i in range(self.rows):
            for j in range(another.cols):
                num = 0
                for k in range(self.cols):
                    num += self._getitem(i, k) * another._getitem(k, j)
                ret._setitem(i, j, num)
        return ret

    def _getitem(self, i, j):
        if i >= self.rows or j >= self.cols:
            raise IndexError("index out of boundary,i=%d,j=%d, %s" % (
                i, j, self.data))
        try:
            return self.data[i][j]
        except Exception:
            return 0

    def _setitem(self, i, j, value):
        if i >= self.rows or j >= self.cols:
            raise IndexError("index out of boundary,i=%d,j=%d,value=%s, %s" % (
                i, j, str(value), self.data))
        if j >= len(self.data[i]):
            fill = self.cols - len(self.data[i])
            self.data[i].extend([0 for _ in range(fill)])
        self.data[i][j] = value

    def __str__(self):
        return "(rows:%d, cols:%d)->%s" % (self.rows, self.cols, self.data)

直接应用分治法

很简单的想法是把矩阵分成n/2的4块。于是
A*B = C
变成
[
\begin{pmatrix}
a & b \
c & d
\end{pmatrix}
*
\begin{pmatrix}
e & f \
g & h

\end{pmatrix}

\begin{pmatrix}
r & s \
t & u
\end{pmatrix}
]

r = ae + bg
s = af + bh
t = ce + dg
u = cf + dh

于是有(T(n) = 8T(n/2)+O(n^2))
(O(n^{log_ba}) = O(n^{log_28})=O(n^3) > f(n))
满足主定理的第一种情况,于是(T(n) = \Theta(n^3)),并没有比直接实现快。

斯特拉森算法

斯特拉森于1969年提出的算法,运用分治策略并加上一些处理技巧设计出的一种矩阵乘法。
他巧妙地8变成了7。于是达到了(T(n) = \Theta(n^{log_27})\approx\Theta(n^{2.81}))
这还不是目前理论上最好的,暂时最好的达到了(T(n) =\Theta(n^{2.376}))

看一下他是怎么玩的。

P1 = (a+d) * (e+h)
P2 = (c+d) * e
P3 = a * (f-h)
P4 = d * (g-e)
P5 = (a+b) * h
P6 = (c-a) * (e+f)
P7 = (b-d) * (g+h)

利用此7个式子即可得到原来的r,s,t,u
r = P1 + P4 – P5 + P7
s = P3 + P5
t = P2 + P4
u = P1 + P3 -P2 + P6
验证一下u看看

u = P1 + P3 -P2 + P6
= (a+d) * (e+h) + a * (f-h) -((c+d) * e) + (c-a) * (e+f)
=ae + ah + de + dh + af - ah -ce - de + ce + cf -ae -af
= dh + cf

正确

代码实现如下:

#!/usr/bin/env python
from enum import Enum, IntEnum, unique
import sys

class matrix:
    '''
    为了简单起见,没有对数据做校验。
    假设矩阵就是nxn的。
    '''
    def __init__(self, data):
        if not data or not hasattr(data, '__getitem__'):
            raise ValueError("data not valid! %s" % data)
        self.data = data
        self.rows = len(data)
        self.cols = max(map(lambda row: len(row), data))
        if self.rows != self.cols:
            raise ValueError("only support nxn matrix, and n can continue divide by 2 util 1")

    def __add__(self, another):
        if self.rows != another.rows:
            raise ValueError("not valid ddata ,only support nxn * nxn")
        ret = matrix([[0]*self.rows for _ in range(self.rows)])
        for i in range(self.rows):
            for j in range(self.rows):
                ret._setitem(i, j, self._getitem(i, j) + another._getitem(i, j))

        return ret

    def __sub__(self, another):
        if self.rows != another.rows:
            raise ValueError("not valid ddata ,only support nxn * nxn")
        ret = matrix([[0]*self.rows for _ in range(self.rows)])
        for i in range(self.rows):
            for j in range(self.rows):
                ret._setitem(i, j, self._getitem(i, j) - another._getitem(i, j))

        return ret

    def __mul__(self, another):
        if self.rows != another.rows:
            raise ValueError("not valid ddata ,only support nxn * nxn")
        ret = matrix([[0]*self.rows for _ in range(self.rows)])
        if self.rows == 2:
            for i in range(self.rows):
                for j in range(another.cols):
                    num = 0
                    for k in range(self.cols):
                        num += self._getitem(i, k) * another._getitem(k, j)
                    ret._setitem(i, j, num)
        else:
            a = self._divide(matrix.DIRECTION.LEFT_TOP)
            b = self._divide(matrix.DIRECTION.RIGHT_TOP)
            c = self._divide(matrix.DIRECTION.LEFT_BOTTOM)
            d = self._divide(matrix.DIRECTION.RIGHT_BOTTOM)

            e = another._divide(matrix.DIRECTION.LEFT_TOP)
            f = another._divide(matrix.DIRECTION.RIGHT_TOP)
            g = another._divide(matrix.DIRECTION.LEFT_BOTTOM)
            h = another._divide(matrix.DIRECTION.RIGHT_BOTTOM)

            p1 = (a+d)*(e+h)
            p2 = (c+d)*e
            p3 = a * (f-h)
            p4 = d * (g-e)
            p5 = (a+b)*h
            p6 = (c-a)*(e+f)
            p7 = (b-d)*(g+h)

            r = p1 + p4 - p5 + p7
            s = p3 + p5
            t = p2 + p4
            u = p1 + p3 - p2 + p6

            ret._merge(matrix.DIRECTION.LEFT_TOP, r)
            ret._merge(matrix.DIRECTION.RIGHT_TOP, s)
            ret._merge(matrix.DIRECTION.LEFT_BOTTOM, t)
            ret._merge(matrix.DIRECTION.RIGHT_BOTTOM, u)

        return ret

    @unique
    class DIRECTION (IntEnum):
        LEFT_TOP = 1
        LEFT_BOTTOM = 2
        RIGHT_TOP = 3
        RIGHT_BOTTOM = 4

    def _divide(self, direction):
        ret = matrix([[0]*int(self.rows/2) for _ in range(int(self.rows/2))])
        row_start = col_start = 0
        if direction == matrix.DIRECTION.LEFT_TOP:
            row_start = 0
            col_start = 0
        elif direction == matrix.DIRECTION.LEFT_BOTTOM:
            row_start = int(self.rows/2)
            col_start = 0
        elif direction == matrix.DIRECTION.RIGHT_TOP:
            row_start = 0
            col_start = int(self.cols/2)
        else:
            row_start = int(self.rows/2)
            col_start = int(self.cols/2)

        for i in range(ret.rows):
            for j in range(ret.cols):
                item = self._getitem(i+row_start, j+col_start)
                ret._setitem(i, j, item)

        return ret

    def _merge(self, direction, another):
        row_start = col_start = 0
        if direction == matrix.DIRECTION.LEFT_TOP:
            row_start = 0
            col_start = 0
        elif direction == matrix.DIRECTION.LEFT_BOTTOM:
            row_start = int(self.rows/2)
            col_start = 0
        elif direction == matrix.DIRECTION.RIGHT_TOP:
            row_start = 0
            col_start = int(self.cols/2)
        else:
            row_start = int(self.rows/2)
            col_start = int(self.cols/2)

        for i in range(another.rows):
            for j in range(another.cols):
                item = another._getitem(i, j)
                self._setitem(i+row_start, j+col_start, item)



    def _getitem(self, i, j):
        if i >= self.rows or j >= self.cols:
            raise IndexError("index out of boundary,i=%d,j=%d, %s" % (
                i, j, self.data))
        try:
            return self.data[i][j]
        except Exception:
            return 0

    def _setitem(self, i, j, value):
        if i >= self.rows or j >= self.cols:
            raise IndexError("index out of boundary,i=%d,j=%d,value=%s, %s" % (
                i, j, str(value), self.data))
        if j >= len(self.data[i]):
            fill = self.cols - len(self.data[i])
            self.data[i].extend([0 for _ in range(fill)])
        self.data[i][j] = value

    def __str__(self):
        return "(rows:%d, cols:%d)->%s" % (self.rows, self.cols, self.data)

测试结果

方法 规模 时间
直接计算 8×8 0.054
拉特斯森 8×8 0.095
直接计算 16×16 0.063
拉特斯森 16×16 0.117
直接计算 32×32 0.090
拉特斯森 32×32 0.454
直接计算 64×64 0.419
拉特斯森 64×64 2.953
直接计算 128×128 2.946
拉特斯森 128×128 20.547
直接计算 256×256 24.835
拉特斯森 256×256 2:15.15
直接计算 512×512 3:15.98

总结

估计是我实现的问题,比预期结果要差,看视频里说的是到32就差不多了。
不过从上也可以看出来,拉特斯森增长的速度没有直接计算的快,迟早性能会更好。

另外,直观上感觉矩阵乘法是没法优化。但事实上可以。
这说明真没有那么多做不到的事,可能只是现在的你做不来,说不定有人能做到,说不定做到的那个人就是未来的你。

高并发处理框架-Tornado入门

高并发处理框架-Tornado入门

简介

Tornado是高性能并发框架。适合做api服务。并且他内置的http服务器足够强大,可直接用于生产环境中。

安装

简单地通过pip install tornado即可安装。

协程的使用。

调用方式

在tornado中,协程通过下面三种方式调用:

  1. 在本身是协程的函数内通过yield关键字调用。
  2. 在IOLoop尚未启动时,通过IOLoop的run_sync()函数调用。(run_sync会帮助你完成启动IOLoop,执行函数调用和关闭IOLoop的工作)
  3. 在IOLoop已经启动时,通过IOLoop的spawn_callback()函数调用。(无返回值。只适合不需要返回值的函数使用)

防止阻塞

为防止在协程中调用了阻塞函数,从而导致性能下降。可以在协程中用线程池。

from concurrent.futures import ThreadPoolExecutor

thread_pool = ThreadPoolExecutor(2)

def mySleep(count):
    import time
    for i in range(count):
        time.sleep(1)

@gen.coroutine
def call_blocking():
    print("start of call blocking")
    yield thread_pool.submit(mySleep, 10)
    print("end of call_blocking")

在协和中等待多个异步调用

把这些调用组织成list/dict传给yield即可。

如下所示:

from tornado import gen
from tornado.httpclient import AsyncHTTPClient

@gen.coroutine
def coroutine_visit():
    http_client = AsyncHTTPClient()
    dict_response = yield {
        "baidu":http_client.fetch("www.baidu.com"),
        "sina":http_client.fetch("www.sina.com"),
        "163":http_client.fetch("www.163.com"),
        "google":http_client.fetch("www.google.com"),
        }
    print(dict_response['sina'].body)

Tornado开发介绍

helloword app

三步曲:
1. 创建app
2. 监听端口
3. 启动事件循环。

完整代码如下:

import tornado.ioloop #事件循环
import tornado.web #基本控件

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello world")

def main():
    app = tornado.web.Application([r"/", MainHandler]) #数组里是定义路由
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

if __name__ == '__main__':
    main()

在这里测试了下。在我的macbook(2核+8G内存)下。python+tornado在请求100次,并发10的情况下每秒处理请求数是833
而php7+yii2是37。同样只是简单地输出”hello world”。

路由

路由配置没啥好说的,都一样。字符串/正则。其中需要提取的需要用()分组。

RequestHandler

这个是重点。

请求前,请求后等处理

可以直接参考源码。这里做个大概介绍。
请求前会调用prepare方法。
请求后会调用on_finish方法。
如果需要其它参数,譬如数据库。可以通过配置路由时,再传一个字典参数。

restfulApi

感觉完全是为这个而生的呀。
直接重写RequestHandler相应的方法即可。(get/head/post/delete/patch/put/options)
关于restfulapi可以参考.

获取参数

有两种参数,一种是get请求的,在url中。另一种是post请求的。在Body里面。
get_argument()/get_arguments() 是综合了两种。
get_query_argument()/get_query_arguments() 是url中的。
get_body_argument()/get_body_arguments() 是body中的。
说明下。下面带s的是用于获取数组的。
get_cookie() 获取cookie
request属性可以获取其它信息,如请求方的ip地址,host, method, uri, path,query,headers, body,cookies等。
其中如果发起方用的是json格式的数据,这里要用到body,而上面的3对argument方法无效。

响应

set_status() 设置状态码
set_header()/add_header() 设置http头信息
write()直接输出。当参数是字典时,自动转成json格式,并且Content_Type也会设置成application/json
render()渲染tempalte后输出。
redirect()重定向。

改变同步代码

  1. 异步化:针对RequestHandler的处理函数使用@tornado.web.asynchronous修饰器。
  2. 协程化:针对RequestHandler的处理函数使用@ado.gen.coroutine修饰器,将默认的同步机制改为协程机制。

算法排序之Python实现

算法排序之Python实现

前言

排序是最经典的问题,基本上玩算法入门就是排序。
python本身有sorted函数。不过下面我们自己实现一下。
以下数据均是在我的同一台macbook上测试的。从结果也可以看出算法的重要性。

排序按模型分类有:

  1. 每次比较只有2个元素。包括插入排序,快速排序,堆排序,归并排序,冒泡排序。他们最好的效率只能到O(nlgn).可以通过画决策树证明。 一共有n!个节点。高度h的数最多有\(2^h\)个节点。所以有\(2^h \geq n!\)。通过斯特林公式\(n!=\sqrt{2πn}(\frac{n}{e})^n(1+\Theta(\frac{1}{n}))\) 可以得证。
  2. 对数据做些操作。譬如计数排序。

1.插入排序

这个算法和打牌很像,玩过牌的应该能很快理解。不多解释,看python的简单实现。
其最坏情况是( \Theta(n^2) )

#升序
def insert_sort(literal):
    if not literal or len(literal) == 1:
        return literal

    sorted = literal[:1]

    for i in range(1, len(literal)):
        v = literal[i]
        sorted.append(0)
        for j in range(i-1, -1, -1):
            if v < sorted[j]:
                sorted[j+1] = sorted[j]
            else:
                sorted[j+1] = v
                break

    return sorted

测试了一万条数据:

python insert_sort.py 6.66s user 0.45s system 98% cpu 7.192 total

2. 选择排序

选择排序(Selection sort)是一种简单直观的排序算法。它的工作原理是每一次从待排序的数据元素中选出最小(或最大)的一个元素,存放在序列的起始位置,直到全部待排序的数据元素排完。

def select_sort(data):
    def swap(i, j):
        temp = data[i]
        data[i] = data[j]
        data[j] = temp

    for i in range(len(data)):
        min = data[i]
        index = i
        for j in range(i, len(data)):
            if data[j] < min:
                min = data[j]
                index = j
        swap(i, index)
    return data

3. 归并排序

归并排序描述起来好简单。就是把规模分成两组,一组都小于某个数n, 另一组都大于等于n。于是规模为n的问题,变成了2个n/2规模的问题。一次次这样划分下去,直到只剩下一个元素,一个元素当然是排好充的,然后再两两合并,直到得到原始问题的解。

其最坏情况是( \Theta(nlgn) )

这个思想是分治法

下面是python的实现:

#升序
def merge_sort(literal):

    def divide(literal):
        end = int(len(literal)/2)
        return (literal[:end], literal[end:])

    def conquer(literal):
        return merge_sort(literal)

    def merge(literal1, literal2):
        ret = []
        len1 = len(literal1)
        len2 = len(literal2)
        i1 = 0
        i2 = 0
        while len1 or len2:
            if not len1 and len2:
                ret.extend(literal2[i2:])
                return ret
            if not len2 and len1:
                ret.extend(literal1[i1:])
                return ret

            v1 = literal1[i1]
            v2 = literal2[i2]
            if v1 <= v2:
                ret.append(v1)
                len1 -= 1
                i1 += 1
            else:
                ret.append(v2)
                len2 -= 1
                i2 += 1
        return ret

    if not literal or len(literal) == 1:
        return literal

    literal1, literal2 = divide(literal)
    literal1 = conquer(literal1)
    literal2 = conquer(literal2)
    return  merge(literal1, literal2)

测试了一万条数据:

python merge_sort.py 0.22s user 0.02s system 84% cpu 0.283 total

之所以divide写的不那么好看,是为了一定要把问题分解。否则遇到 [1, 1 ,1]这样的数据就悲剧了。

4. 冒泡排序

它重复地走访过要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成。
这个算法的名字由来是因为越大的元素会经由交换慢慢“浮”到数列的顶端,故名。

#!/usr/bin/env python
import sys
import random
def bubble_sort(data):
    def swap(i, j):
        temp = data[i]
        data[i] = data[j]
        data[j] = temp

    exchanged = False
    for i in range(len(data)):
        for j in range(len(data)-1, i, -1):
            if data[j] < data[j-1]:
                swap(j, j-1)
                exchanged = True
        if not exchanged:
            return data
    return data

5. 堆排序

堆排序(Heapsort)是指利用堆积树(堆)这种数据结构所设计的一种排序算法,它是选择排序的一种。可以利用数组的特点快速定位指定索引的元素。堆分为大根堆和小根堆,是完全二叉树。大根堆的要求是每个节点的值都不大于其父节点的值,即A[PARENT[i]] >= A[i]。在数组的非降序排序中,需要使用的就是大根堆,因为根据大根堆的要求可知,最大的值一定在堆顶。

时间效率是(O(n\lg{n})),但实践中不如快速排序。不过用来实现优先级队列非常好。

#!/usr/bin/env python
import sys
import random


def heap_sort(data):
    datalength = len(data)
    data.insert(0, 0)

    def parent_index(i):
        return i >> 1

    def left_index(i):
        return i << 1

    def right_index(i):
        return left_index(i)+1

    def max_heapfy(heap_size, i):
        left = left_index(i)
        right = right_index(i)
        max = data[i]
        key = i
        if left <= heap_size and data[left] > max:
            max = data[left]
            key  = left

        if right <= heap_size and data[right] > max:
            max = data[right]
            key = right
        if key != i:
            swap(i, key)
            max_heapfy(heap_size, key)


    def build_heap():
        for i in range(int(datalength/2), 0, -1):
            max_heapfy(datalength, i)


    def swap(i, j):
        temp = data[i]
        data[i] = data[j]
        data[j] = temp

    build_heap()
    for i in range(datalength, 0, -1):
        swap(i, 1)
        max_heapfy(i-1, 1)


    return data[1:]

算法描述里索引是从1开始的,我这里取了个巧。补了个无用的0,来达到索引从1开始的目的。

6. 快速排序

快速排序和归并排序差不多。区别是归并排序只是把一个规模为n的问题划分成2个规模为n/2的子问题。
而快速排序划分的两个子问题,本身是有序的。即子问题A中的数都要小于等于子问题B中的数,这样在合并问题时更简单。

由于python的特性。做切片操作其实是做的复制工作。
我这里会提供二个快速排序的实现。供参考。

实现1

def quick_sort(literal):

    def divide(literal):
        v1 = literal[0]
        v2 = literal[1]
        v = max(v1, v2)
        literal1 = []
        literal2 = []
        if v1 == v2:
            literal1.append(v1)
            literal2.append(v2)
            for i in literal[2:]:
                if i < v:
                    literal1.append(i)
                else:
                    literal2.append(i)
        else:
            for i in literal:
                if i < v:
                    literal1.append(i)
                else:
                    literal2.append(i)

        return (literal1, literal2)

    def conquer(literal):
        return quick_sort(literal)

    def merge(literal1, literal2):
        ret = []
        ret.extend(literal1)
        ret.extend(literal2)
        return ret

    if not literal or len(literal) == 1:
        return literal

    literal1, literal2 = divide(literal)
    literal1 = conquer(literal1)
    literal2 = conquer(literal2)
    return  merge(literal1, literal2)

实现二的改进是原地排序

这种方式少了copy的代价,占用的空间更少,少了合并的步骤,效率也会更高。
我有个问题没有想清楚,这个程序和上面的程序差不多,都是递归实现,但是上面的程序可以排序100万个数都没问题,这个程序排序到2万个数就报堆栈溢出了。暂时放个问号????

#升序, 原地排序
def quick_sort(literal, start, end):
    def swap(i, j):
        temp = literal[i]
        literal[i] = literal[j]
        literal[j] = temp

    def divide(start, end):
        i = start
        key = literal[start]

        for j in range(start+1, end):
            if literal[j] <= key:
                swap(j,i+1)
                i += 1
        swap(i, start)
        return i+1


    if not literal or len(literal) <= 1:
        return
    if (end-start) <= 1:
        return
    index = divide(start, end)
    quick_sort(literal, start, index)
    quick_sort(literal, index, end)
    return literal

7. 计数排序

这个有个前提条件。必须是整数范围,同时最大的数不能太大。因为效率是O(n+k), 所以k不能太大。
远远小于n时,效率非常高,为线性排序。

参考实现:

def count_sort(alist, k):
    '''
    要求元素都比较小。时间效率是O(k+n),k为最大值。
    '''
    countList = [0 for _ in range(k+1)]
    for v in alist:
        countList[v] += 1
    retList = [0 for _ in range(len(countList))]
    for i, v in enumerate(countList):
        retList.extend([i for _ in range(v)])
    return retList

8. 基数排序

基数排序要用到计数排序。先对最后一位排序,再对前一位排序,直到排序完成。
同样要求是整数。好的是要求的数字大小范围没有计数排序那么小。

理论上这是最佳的排序算法,但是在实践中,最优秀的是快速排序
其时间复杂度为(O(n\log_rm)),其中r为所采取的基数,而m为堆数,在某些时候,基数排序法的效率高于其它的稳定性排序法。

上面实现的计数排序是偷懒的做法。因为是新生成的元素,而这里只是对某位排序,需要保留原数字,故要实现完整版的计数排序。

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import random

def radix_sort(data, length):
    '''
    这个是简单版本,只实现了十进制的。
    '''
    def count_sort(alist, p):
        '''
        要求元素都比较小。时间效率是O(k+n),k为最大值。p为要排序的位置
        '''
        def get_position_value(v, p):
            for i in range(1, p+1):
                left = v % 10
                v = v/10
            return left
        countList = [0 for _ in range(10)]
        for v in alist:
            countList[get_position_value(v, p)] += 1
        sum = 0
        #calculate the position
        for i in range(10):
            sum += countList[i]
            countList[i] = sum

        retList = [0 for _ in range(len(alist))]
        for v in alist[::-1]:
            index = countList[get_position_value(v, p)]
            retList[index-1] = v
            countList[get_position_value(v, p)] -= 1
        return retList

    for i in range(1, length+1):
        data = count_sort(data, i)

    return data

9. 桶排序

算法经典问题之第k大/小问题(寻找中位数)

算法经典问题之第k大/小问题(寻找中位数)

问题描述

从n个未排序的数中找出第k小的数。
有直接解法是先排序再拿第k个数即可,但是最好的算法也要(O(n\lg{n}))。
下面的解法更优。

解法

这个其实是利用的分治法思考,和二分查找很像。
和快速排序同样的过程,先随机选一个数,然后把比它小的放到左边,比它大的放到右边。设排完后这个数的索引是m,
对比m和k,
如果m==k,说明这个数就是我们要找的。
如果m<k, 说明前m个数都比要找的数小。从而问题变为从右边的数中找到第k-m小的数。
如果m>k,说明要找的数在前m个数中,从而问题变为从右边的数中找到第k小的数。

按此算法,它的时间效率是O(n)。

python实现

#!/usr/bin/env python
import random


def min_k(data, k):

    def swap(i, j):
        temp = data[i]
        data[i] = data[j]
        data[j] = temp

    n = len(data)
    s = random.randint(0, n-1)
    swap(s, 0)
    j = 1
    key = data[0]
    for i in range(1, n):
        if data[i] < key:
            swap(j, i)
            j += 1
    swap(j-1, 0)
    if j == k:
        return data[j-1]
    elif j > k:
        return min_k(data[0:j-1], k)
    else:
        return min_k(data[j:], k-j)