使用 Flask 搭建静态博客

现在流行的静态博客/网站生成工具有很多,比如 Jekyll, Pelican, Middleman, Hyde 等等,StaticGen 列出了目前最流行的一些静态网站生成工具。

我们的内部工具由 Python/Flask/MongoDB 搭建,现在需要加上文档功能,写作格式是 Markdown,不想把文档放到数据库里,也不想再弄一套静态博客工具来管理文档,于是找到了 Flask-FlatPages 这个好用的 Flask 模块。熟悉 Flask 的同学花几分钟的时间就可以用搭建一个简单博客,加上 Bootstrap 的帮助,不到一小时内就可以用 Flask-Flatpages 弄个像模像样的网站出来。

创建开发环境

首先我们需要 pip,在 Mac 上最简单的安装办法是:

$ sudo easy_install pip
$ sudo easy_install virtualenv

如果你在 Mac 上用 Homebrew 包管理工具的话的话,也可以用 brew 升级 Python 和安装 pip:

$ brew update
$ brew install python

创建一个 blog 目录、生成 Python 独立虚拟环境并在这个环境里安装需要的 Flask, Flask-FlatPages 模块:

$ mkdir blog
$ cd blog

$ virtualenv flask
New python executable in flask/bin/python
Installing setuptools, pip...done.

$ flask/bin/pip install flask
$ flask/bin/pip install flask-flatpages

在 blog 目录下我们分别新建几个目录:static 用来存放 css/js 等文件,templates 用来存放 flask 要用的 Jinja2 模版,pages 用来存放我们静态博客(Markdown 格式):

$ mkdir -p app/static app/templates app/pages

程序

主程序 blog.py 的功能是,导入必要的模块、配置 Flask-FlatPages 模块需要的参数、创建 Flask 应用、写几个 URL 路由函数,最后运行这个应用:

$ vi app/blog.py
#!flask/bin/python
from flask import Flask, render_template
from flask_flatpages import FlatPages

DEBUG = True
FLATPAGES_AUTO_RELOAD = DEBUG
FLATPAGES_EXTENSION = '.md'

app = Flask(__name__)
app.config.from_object(__name__)
flatpages = FlatPages(app)

@app.route('/')
def index():
    pages = (p for p in flatpages if 'date' in p.meta)
    return render_template('index.html', pages=pages)

@app.route('/pages/<path:path>/')
def page(path):
    page = flatpages.get_or_404(path)
    return render_template('page.html', page=page)

if __name__ == '__main__':
    app.run(port=8000)

模版

在 Python 中直接生成 HTML 很繁琐并不好玩(那是上个世纪90年代的 PHP 搞的事情),在现代社会,我们使用模版引擎,Flask 已经自动配置好了 Jinja2 模版,使用方法 render_template() 来渲染模版就可以了。Flask 会默认在 templates 目录里中寻找模版,我们只需要创建几个模版文件就可以了,这里我们创建 base.html, index.html 和 page.html.

$ vi app/templates/base.html
<!doctype html>
<html>
<head>
    <meta charset="utf-8">
    <title>vpsee.com static blog</title>
</head>

<body>
    <h1><a href="{{ url_for("index") }}">vpsee.com blog</a></h1>
    {% block content %}
    {% endblock content %}
</body>
</html>

代码里 extends “base.html” 的意思是从 base.html 里继承基本的 “骨架”。

$ vi app/templates/index.html
{% extends "base.html" %}

{% block content %}
    <h2>List of pages
    <ul>
        {% for page in pages %}
        <li>
            <a href="{{ url_for("page", path=page.path) }}">{{ page.title }}</a>
        </li>
        {% else %}
        <li>No post.</li>
        {% endfor %}
    </ul>
{% endblock content %}
$ vi app/templates/page.html
{% extends "base.html" %}

{% block content %}
    <h2>{{ page.title }}</h2>
    {{ page.html|safe }}
{% endblock content %}

Flask-FlatPages 模块会默认从 pages 目录里寻找 .md 结尾的 Markdown 文档,所以我们把静态博客的内容都放在这个目录里:

$ vi app/pages/hello-world.md
title: Hello World
date: 2014-10-14
tags: [general, blog]

**Hello World**!

$ vi app/pages/test-flatpages.md
title: Test Flask FlatPages
date: 2014-10-15
tags: [python, flask]

Test [Flask-FlatPages](https://pythonhosted.org/Flask-FlatPages/)

运行

基本搞定,运行看看效果吧:

$ flask/bin/python app/blog.py
 * Running on http://127.0.0.1:8000/
 * Restarting with reloader

build a static blog with flask

静态化

到目前为止,上面的博客运行良好,但是有个问题,这个博客还不是 “静态” 的,没有生成任何 html 文件,不能直接放到 nginx/apache 这样的 web 服务器下用。所以我们需要另一个 Flask 模块 Frozen-Flask 的帮助。

安装 Frozen-Flask:

$ flask/bin/pip install frozen-flask

修改 blog.py,导入 Flask-Frozen 模块,初始化 Freezer,使用 freezer.freeze() 生成静态 HTML:

$ vi app/blog.py
...
from flask_flatpages import FlatPages
from flask_frozen import Freezer
import sys
...
flatpages = FlatPages(app)
freezer = Freezer(app)
...
if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == "build":
        freezer.freeze()
    else:
        app.run(port=8000)

运行 blog.py build 后就在 app 目录下生成 build 目录,build 目录里面就是我们要的 HTML 静态文件:

$ flask/bin/python app/blog.py build

$ ls app/
blog.py   build     pages     static    templates

更清晰的目录结构如下:

$ tree app
app
├── blog.py
├── build
│   ├── index.html
│   └── pages
│       ├── hello-world
│       │   └── index.html
│       └── test-flatpages
│           └── index.html
├── pages
│   ├── hello-world.md
│   └── test-flatpages.md
├── static
└── templates
    ├── base.html
    ├── index.html
    └── page.html

安装和使用 Elasticsearch

Elasticsearch 是开源搜索平台的新成员,实时数据分析的神器,发展迅猛,基于 Lucene、RESTful、分布式、面向云计算设计、实时搜索、全文搜索、稳定、高可靠、可扩展、安装+使用方便,介绍都说的很好听,好不好用拿出来遛一遛。

做了个简单测试,在两台完全一样的虚拟机上,2000万条左右数据,Elasticsearch 插入数据速度比 MongoDB 慢很多(可以忍受),但是搜索/查询速度快10倍以上,这只是单机情况,多机集群情况下 Elasticsearch 表现更好一些。以下安装步骤在 Ubuntu Server 14.04 LTS 上完成。

安装 Elasticsearch

升级系统后安装 Oracle Java 7,既然 Elasticsearch 官方推荐使用 Oracle JDK 7 就不要尝试 JDK 8 和 OpenJDK 了:

$ sudo apt-get update
$ sudo apt-get upgrade

$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update

$ sudo apt-get install oracle-java7-installer

加入 Elasticsearch 官方源后安装 elasticsearch:

$ wget -O - http://packages.elasticsearch.org/GPG-KEY-elasticsearch | sudo apt-key add -
$ sudo echo "deb http://packages.elasticsearch.org/elasticsearch/1.1/debian stable main" >> /etc/apt/sources.list

$ sudo apt-get update
$ sudo apt-get install elasticsearch

加入到系统启动文件并启动 elasticsearch 服务,用 curl 测试一下安装是否成功:

$ sudo update-rc.d elasticsearch defaults 95 1

$ sudo /etc/init.d/elasticsearch start

$ curl -X GET 'http://localhost:9200'
{
  "status" : 200,
  "name" : "Fer-de-Lance",
  "version" : {
    "number" : "1.1.1",
    "build_hash" : "f1585f096d3f3985e73456debdc1a0745f512bbc",
    "build_timestamp" : "2014-04-16T14:27:12Z",
    "build_snapshot" : false,
    "lucene_version" : "4.7"
  },
  "tagline" : "You Know, for Search"
}

Elasticsearch 的集群和数据管理界面 Marvel 非常赞,可惜只对开发环境免费,如果这个工具也免费就无敌了,安装很简单,完成后重启服务访问 http://192.168.2.172:9200/_plugin/marvel/ 就可以看到界面:

$ sudo /usr/share/elasticsearch/bin/plugin -i elasticsearch/marvel/latest

$ sudo /etc/init.d/elasticsearch restart
 * Stopping Elasticsearch Server                                           [ OK ]
 * Starting Elasticsearch Server                                           [ OK ]

Elasticsearch Marvel

安装 Python 客户端驱动

和 MongoDB 一样,我们一般用程序和 Elasticsearch 交互,Elasticsearch 也支持多种语言的客户端驱动,这里仅安装 Python 驱动,其他语言可以参考官方文档。

$ sudo apt-get install python-pip
$ sudo pip install elasticsearch

写个简单程序把 gene_info.txt 的数据导入到 Elasticsearch:

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os, os.path, sys, re
import csv, time, string
from datetime import datetime
from elasticsearch import Elasticsearch

def import_to_db():
    data = csv.reader(open('gene_info.txt', 'rb'), delimiter='\t')
    data.next()

    es = Elasticsearch()
    for row in data:
        doc = {
            'tax_id': row[0],
            'GeneID': row[1],
            'Symbol': row[2],
            'LocusTag': row[3],
            'Synonyms': row[4],
            'dbXrefs': row[5],
            'chromosome': row[6],
            'map_location': row[7],
            'description': row[8],
            'type_of_gene': row[9],
            'Symbol_from_nomenclature_authority': row[10],
            'Full_name_from_nomenclature_authority': row[11],
            'Nomenclature_status': row[12],
            'Other_designations': row[13],
            'Modification_date': row[14]
        }
        res = es.index(index="gene", doc_type='gene_info', body=doc)

def main():
    import_to_db()

if __name__ == "__main__":
    main()

Kibana 是一个功能强大的数据显示客户端,通过插件方式和 Elasticsearch 集成在一起,安装很容易,下载解压就可以了,然后重启 Elasticsearch 服务访问 http://192.168.2.172:9200/_plugin/kibana/ 就能看到界面:

$ wget https://download.elasticsearch.org/kibana/kibana/kibana-3.0.1.tar.gz
$ tar zxvf kibana-3.0.1.tar.gz
$ sudo mv kibana-3.0.1 /usr/share/elasticsearch/plugins/_site
$ sudo /etc/init.d/elasticsearch restart

Elasticsearch Kibana

安装和使用系统监控工具 Glances

Glances 是前几天网上闲逛的时候发现的一款 “新” 系统监控工具,尽管现在监控工具有很多选择,Glances 还是有些值得关注的,和那些常用的老牌监控工具比起来,比如 top/vmstat/iostat 只能监控本机系统,Glances 可以监控本机也可以通过客户端服务器模式监控其他机器;Glances 提供了基于 XML/RPC 的 API 便于其他程序调用,可编程;Glances 可以将数据输出保存到 csv 或 html 格式的文件方便其他程序处理(报告或绘制图形)。

Glances 是用 Python 开发的,使用 psutil 库来采集系统数据,在用户的终端上实时动态的显示重要的系统数据和变化。显示的数据包括:CPU、内存、磁盘、网络等使用情况,内核、运行队列、负载、I/O 状态、消耗资源最多的进程等等。

安装

Glance 支持 Linux, Mac OS X, FreeBSD, Windows 等多个系统,安装也很方便。在 Ubuntu 上安装:

$ sudo apt-get update
$ sudo apt-get install python-pip build-essential python-dev

$ sudo pip install glances

在 CentOS 6.x 上安装:

$ su root
# rpm -ivh http://fr2.rpmfind.net/linux/epel/6/x86_64/epel-release-6-7.noarch.rpm
# yum install python-pip python-devel
# pip-python install glances

在 FreeBSD 上安装:

# pkg_add -r py27-glances
或者
# cd /usr/ports/sysutils/py-glances/
# make install clean

使用

Glances 可以单机使用,也可以客户端-服务器模式多机使用。单机使用很简单,直接运行就可以了:

$ glances

客户端-服务器模式稍微复杂一点,需要在一台机器上以服务器模式启动 glances -s,另外一台机器以客户端模式连接 glances -c. 比如在有两台机器 A 和 B 都装了 glances,要想在 A 上看 B 上的 glances 的话需要事先在 B 上用服务器模式启动 glances(假设 B 的 IP 地址是 192.168.2.22):

$ glances -s

然后再从 A(客户端)用 Glances 访问 B(服务器):

$ glances -c 192.168.2.22

编程

Glances 和其他一堆老牌系统监控工具相比其突出优点在于提供 XML-RPC API,可编程。使用 Glances 提供的 API,我们可以通过编程轻松获取(我们想要的)数据。比如下面的是一个打印系统信息的简单 Python 脚本:

$ vi test.py
#!/usr/bin/python
import xmlrpclib

s = xmlrpclib.ServerProxy('http://192.168.2.22:61209')
print s.getSystem()

运行上面这个脚本:

$ python test.py
{"linux_distro": "Ubuntu 12.04", "platform": "64bit", "os_name": "Linux", "hostname": "vpsee.com", "os_version": "3.2.0-23-virtual"}

Glances 的界面:
docker desktop

使用浏览器访问 Linux 终端

wssh 可以让我们通过 HTTP 来调用远程的一个 shell,也就是说我们可以用浏览器来访问某个 Linux 服务器/虚拟机的终端(只要这个服务器上运行了 wsshd 服务器端)。wssh 客户端通过 ssh 帐号连接到 wsshd 服务器端。wssh 更多的是当作库来开发一些应用,比如开发云计算、虚拟机后台控制面板的虚拟机控制台等等。我们先来玩一下简单的~

安装一些必要软件:

$ sudo apt-get install git gcc python libevent-dev python-dev python-pip

安装 wssh 需要的各种 Python 库:

$ sudo pip install gevent gevent-websocket paramiko flask

下载并安装 wssh:

$ git clone https://github.com/aluzzardi/wssh.git
$ cd wssh
$ sudo python setup.py install

运行 wsshd:

$ wsshd
wsshd/0.1.0 running on 0.0.0.0:5000

从浏览器打开 http://IP:5000 后会看到如下登陆界面:
wssh

使用 ssh 帐号登陆后就可以看到终端了:
wssh

使用 Python-LDAP 操作 LDAP

周末看到那些排队血拼的人们,不用走进 shopping mall、不用看到那些五颜六色的打折和视觉冲击就能感受到 “节日要到了!”。一年又快结束了,这周完成备份、升级之类的收尾工作,接下来就是6周的假期,没啥大安排,假期第1周去南非德班参加高性能计算会议,回来后和家人短途旅行,然后圣诞节在家休息学点新东西,比如修车什么的,几次痛苦经历告诉我出来玩迟早是要坏的,对于 hiking/camping/road trip/4×4 这几个关键字的爱好者来说懂点维修常识是必须的。废话留到假期再说吧,接下来六周可能没有技术方面的博客更新~

最近对 LDAP 服务器上面的数据做处理,有机会接触了一下 Python-LDAP 这个库和 LDAP/Kerberos. 去除所有打印和错误处理的代码后,用 Python-LDAP 操作 LDAP 的骨干代码其实很简单,就这么几行,唯一遇到的一个小麻烦就是折腾了一个多小时才知道 ‘TRUE’ 要大写(后面有说到)。

安装 Python-LDAP

在 Ubuntu/Debian 下安装 python-ldap 模块:

$ sudo apt-get install python-ldap

在 CentOS/RHEL 下安装 python-ldap 模块:

# yum install python-ldap

创建

创建一条 LDAP 新纪录。有个要注意的地方,我们的 LDAP 有个属性 active,用来判断用户帐号是否是激活的 attrs[‘active’] = ‘TRUE’,这里的 ‘TRUE’ 不能用小写的 ‘true’,刚开始被 LDAP 管理工具上的小写 ‘true’ 误导,老以为 Python 程序里也应该用小写,结果总报错。

phpLDAPadmin

def ldap_add(firstname, lastname, username):
    l = ldap.open(LDAP_HOST)
    l.protocol_version = ldap.VERSION3
    l.simple_bind(LDAP_BIND, LDAP_PASS)

    cn = firstname + ' ' + lastname
    addDN = "cn=%s,ou=People,dc=vpsee,dc=com" % cn
    attrs = {}
    attrs['objectclass'] = ['top','person','inetOrgPerson','posixAccount','vpseeAccount']
    attrs['cn'] = cn
    attrs['givenName'] = firstname
    attrs['homeDirectory'] = '/home/people/%s' % username
    attrs['loginShell'] = '/bin/bash'
    attrs['sn'] = lastname
    attrs['uid'] = username
    attrs['uidNumber'] = ldap_newuid()
    attrs['gidNumber'] = ldap_getgid()
    attrs['active'] = 'TRUE'
    ldif = modlist.addModlist(attrs)
    l.add_s(addDN, ldif)
    l.unbind_s()

查找和读取

查找和读取一条 LDAP 纪录,比如根据 username 查找出 cn:

def ldap_getcn(username):
    try:
        l = ldap.open(LDAP_HOST)
        l.protocol_version = ldap.VERSION3
        l.simple_bind(LDAP_BIND, LDAP_PASS)

        searchScope = ldap.SCOPE_SUBTREE
        searchFilter = "uid=*" + username + "*"
        resultID = l.search(LDAP_BASE, searchScope, searchFilter, None)
        result_set = []
        while 1:
            result_type, result_data = l.result(resultID, 0)
            if (result_data == []):
                break
            else:
                if result_type == ldap.RES_SEARCH_ENTRY:
                    result_set.append(result_data)
        return result_set[0][0][1]['cn'][0]
    except ldap.LDAPError, e:
        print e

更新

更新一条 LDAP 纪录,比如更新用户状态 active 为 false:

def ldap_deactive(username):
    try:
        l = ldap.open(LDAP_HOST)
        l.protocol_version = ldap.VERSION3
        l.simple_bind(LDAP_BIND, LDAP_PASS)

        deactiveDN = ("cn=%s," + LDAP_BASE) % ldap_getcn(username)
        old = {'active':'TRUE'}
        new = {'active':'FALSE'}
        ldif = modlist.modifyModlist(old, new)
        l.modify_s(deactiveDN, ldif)
        l.unbind_s()
    except ldap.LDAPError, e:
        print e

删除

删除一条 LDAP 纪录:

def ldap_delete(username):
    try:
        l = ldap.open(LDAP_HOST)
        l.protocol_version = ldap.VERSION3
        l.simple_bind(LDAP_BIND, LDAP_PASS)

        deleteDN = ("cn=%s," + LDAP_BASE) % ldap_getcn(username)
        l.delete_s(deleteDN)
    except ldap.LDAPError, e:
        print e

Hello World, OpenNebula Cloud API 编程

先报告一下我们云计算项目的进度。去年休假前订购的服务器和部件已经陆续到货了,计算节点采用的是 Dell PowerEdge M710HD 刀片服务器,特别为数据中心级虚拟应用设计,海量内存、密集 IO 吞吐等优势,特别适合云计算、虚拟机等应用。现在正在等 Dell 的售后技术人员过来安装服务器和存储阵列,有些电源和机柜问题需要解决,顺利的话下周服务器可以上线。

dell poweredge m710hd

OpenNebula 提供了 XML-RPC 的方式访问 OpenNebula Cloud Api (OCA),这样就允许不同操作系统、不同语言编写的客户端程序可以通过 XML-RPC 远程调用的方式来访问 OpenNebula 服务。下面通过两个不同语言编写的最简单例子抛砖引玉一下,来看看如何是如何与 OCA 打交道的。

OpenNebula 绝大部分是由 Ruby 编写的,其提供的 Ruby OCA API 实现当然是最丰富和完整的。先安装 Ruby OCA Bindings:

$ sudo gem install oca

用 Ruby 编写一小段代码试验一下,以下代码用来打印当前云里每个计算结点的 hostname:

#!/usr/bin/ruby

require 'rubygems'
require 'oca'

include OpenNebula

# OpenNebula credentials
CREDENTIALS = "oneadmin:vpsee"

# XML_RPC endpoint where OpenNebula is listening
ENDPOINT    = "http://localhost:2633/RPC2"

client = Client.new(CREDENTIALS, ENDPOINT)
host_pool = HostPool.new(client)
rc = host_pool.info

# Print all the hostname from the host pool
host_pool.each do |host|
     puts host.name
end

再来看看用 Python 如何编写上面类似功能的代码。安装 Python OCA Bindings:

$ sudo easy_install oca

用 Python 编写一小段代码看一下:

#!/usr/bin/python

import oca

# OpenNebula credentials
CREDENTIALS = "oneadmin:vpsee"

# XML_RPC endpoint where OpenNebula is listening
ENDPOINT    = "http://localhost:2633/RPC2"

client = oca.Client(CREDENTIALS, ENDPOINT)
host_pool = oca.HostPool(client)
host_pool.info()

# Print all the hostname from the host pool
for host in host_pool:
    print host.name

应该没人会想在这种情况下用 Java 或 C++ 吧,Programming Examples 里面提供的 Java OCA 和 C++ 例子比 Ruby, Python 复杂得多。

随机生成 Xen 虚拟机的 MAC 地址

如果不在创建 Xen 虚拟机(domU)的时候指定 MAC 地址的话,Xen 就会随机生成一个 MAC 地址给虚拟机,这样管理员就没办法知道虚拟机的 MAC 地址了,不利于以后带宽的统计和虚拟机的管理,所以最好就在 Xen 虚拟机配置文件中直接指明虚拟机的 MAC 地址,比如:

# vi /etc/xen/vpsuser1
vif = [ "mac=00:16:3e:0c:11:53,ip=172.16.16.200,bridge=xenbr0" ]

IEEE OUI 为 Xen domU 保留了一段 MAC 地址,前3段是 “00-16-3e”,后3段是随机的,其中第1个随机段的第1个 bit 是0(IEEE OUI 给 VMware 保留的前3段地址是 “00-0c-29”,后3段随机)。我们在为用户分配 MAC 地址时候不想有规律的分配(虽然也不会有什么安全问题),所以 VPSee 写了一个简单的 Python 脚本用来随机生成符合 Xen 虚拟机标准的 MAC 地址:

#!/usr/bin/python 
# generates a MAC address for Xen domU
# http://www.vpsee.com
#

import random

mac = [ 0x00, 0x16, 0x3e, random.randint(0x00, 0x7f), 
random.randint(0x00, 0xff), random.randint(0x00, 0xff) ]
s = []
for item in mac:
        s.append(str("%02x" % item))
print ':'.join(s)

简单调试 Python 程序

在 Python 中也可以像 gcc/gdb 那样调试程序,只要在运行 Python 程序时引入 pdb 模块(假设要调试的程序名为 d.py):

$ vi d.py
#!/usr/bin/python

def main():
        i, sum = 1, 0
        for i in xrange(100):
                sum = sum + i
        print sum

if __name__ == '__main__':
        main()

$ python -m pdb d.py

运行上面的命令后进入以下界面,可以输入类似 gdb 的命令来改变程序的执行流程:

$ python -m pdb 1.py 
> d.py(3)()
-> def main():
(Pdb) 

list 显示程序的最近代码段:

(Pdb) list
  1  	#!/usr/bin/python
  2  	
  3  ->	def main():
  4  		i, sum = 1, 0
  5  		for i in xrange(100):
  6  			sum = sum + i
  7  		print sum
  8  	 
  9  	if __name__ == '__main__':
 10  		main()
[EOF]

next 或者 n 执行下一行代码:

(Pdb) next
> d.py(9)()
-> if __name__ == '__main__':

用 break 在第6行设置一个断点:

(Pdb) break d.py:6
Breakpoint 1 at d.py:6

(Pdb) list
  1  	#!/usr/bin/python
  2  	
  3  	def main():
  4  		i, sum = 1, 0
  5  ->		for i in xrange(100):
  6 B			sum = sum + i
  7  		print sum
  8  	 
  9  	if __name__ == '__main__':
 10  		main()
[EOF]

如果想在函数处设置断点:

(Pdb) break d.main
d.py:3

(Pdb) list
  1  	#!/usr/bin/python
  2  	
  3 B	def main():
  4  ->		i, sum = 1, 0
  5  		for i in xrange(100):
  6  			sum = sum + i
  7  		print sum
  8  	 
  9  	if __name__ == '__main__':
 10  		main()
[EOF]

还可以给断点加条件,比如设置条件只有当 sum > 50 的时候才 break:

(Pdb) break d.py:6, sum > 50
Breakpoint 1 at d.py:6

如果想查看某个变量的值,可以用 pp 命令打印出来:

(Pdb) step
> d.py(5)main()
-> for i in xrange(100):
(Pdb) pp sum
0

可以直接在程序里使用 pdb 模块,import pdb 后 pdb.set_trace():

#!/usr/bin/python
import pdb

def main():
        i, sum = 1, 0
        for i in xrange(100):
                sum = sum + i
        pdb.set_trace()
        print sum

if __name__ == '__main__':
        main()

这样只要运行程序 ./d.py 就可以直接运行到 print sum 处:

$ ./d.py 
> d.py(9)main()
-> print sum
(Pdb) 

总结

命令 用途
break 或 b 设置断点
continue 或 c 继续执行程序
list 或 l 查看当前行的代码段
step 或 s 进入函数
return 或 r 执行代码直到从当前函数返回
exit 或 q 中止并退出
next 或 n 执行下一行
pp 打印变量的值
help 帮助

用 Python 做单词拼写检查

这几天在翻旧代码时发现以前写的注释部分有很多单词拼写错误,这些单词错得不算离谱,应该可以用工具自动纠错绝大部分。用 Python 写个拼写检查脚本很容易,如果能很好利用 aspell/ispell 这些现成的小工具就更简单了。

要点

1、输入一个拼写错误的单词,调用 aspell -a 后得到一些候选正确单词,然后用距离编辑进一步嗮选出更精确的词。比如运行 aspell -a,输入 ‘hella’ 后得到如下结果:
hell, Helli, hello, heal, Heall, he’ll, hells, Heller, Ella, Hall, Hill, Hull, hall, heel, hill, hula, hull, Helga, Helsa, Bella, Della, Mella, Sella, fella, Halli, Hally, Hilly, Holli, Holly, hallo, hilly, holly, hullo, Hell’s, hell’s

2、什么是距离编辑(Edit-Distance,也叫 Levenshtein algorithm)呢?就是说给定一个单词,通过多次插入、删除、交换、替换单字符的操作后枚举出所有可能的正确拼写,比如输入 ‘hella’,经过多次插入、删除、交换、替换单字符的操作后变成:
‘helkla’, ‘hjlla’, ‘hylla’, ‘hellma’, ‘khella’, ‘iella’, ‘helhla’, ‘hellag’, ‘hela’, ‘vhella’, ‘hhella’, ‘hell’, ‘heglla’, ‘hvlla’, ‘hellaa’, ‘ghella’, ‘hellar’, ‘heslla’, ‘lhella’, ‘helpa’, ‘hello’, …

3、综合上面2个集合的结果,并且考虑到一些理论知识可以提高拼写检查的准确度,比如一般来说写错单词都是无意的或者误打,完全错的单词可能性很小,而且单词的第一个字母一般不会拼错。所以可以在上面集合里去掉第一个字母不符合的单词,比如:’Sella’, ‘Mella’, khella’, ‘iella’ 等,这里 VPSee 不删除单词,而把这些单词从队列里取出来放到队列最后(优先级降低),所以实在匹配不了以 h 开头的单词才去匹配那些以其他字母开头的单词。

4、程序中用到了外部工具 aspell,如何在 Python 里捕捉外部程序的输入和输出以便在 Python 程序里处理这些输入和输出呢?Python 2.4 以后引入了 subprocess 模块,可以用 subprocess.Popen 来处理。

5、Google 大牛 Peter Norvig 写了一篇 How to Write a Spelling Corrector 很值得一看,大牛就是大牛,21行 Python 就解决拼写问题,而且还不用外部工具,只需要事先读入一个词典文件。本文程序的 edits1 函数就是从牛人家那里 copy 的。

代码

 
#!/usr/bin/python
# A simple spell checker
# written by http://www.vpsee.com 

import os, sys, subprocess, signal

alphabet = 'abcdefghijklmnopqrstuvwxyz'

def found(word, args, cwd = None, shell = True):
    child = subprocess.Popen(args, 
        shell = shell,  
        stdin = subprocess.PIPE, 
        stdout = subprocess.PIPE, 
        cwd = cwd,  
        universal_newlines = True) 
    child.stdout.readline()
    (stdout, stderr) = child.communicate(word)
    if ": " in stdout:
        # remove \n\n
        stdout = stdout.rstrip("\n")
        # remove left part until :
        left, candidates = stdout.split(": ", 1) 
        candidates = candidates.split(", ")
        # making an error on the first letter of a word is less 
        # probable, so we remove those candidates and append them 
        # to the tail of queue, make them less priority
        for item in candidates:
            if item[0] != word[0]: 
                candidates.remove(item)
                candidates.append(item)
        return candidates
    else:
        return None

# copy from http://norvig.com/spell-correct.html
def edits1(word):
    n = len(word)
    return set([word[0:i]+word[i+1:] for i in range(n)] +                     
        [word[0:i]+word[i+1]+word[i]+word[i+2:] for i in range(n-1)] +
        [word[0:i]+c+word[i+1:] for i in range(n) for c in alphabet] +
        [word[0:i]+c+word[i:] for i in range(n+1) for c in alphabet])

def correct(word):
    candidates1 = found(word, 'aspell -a')
    if not candidates1:
        print "no suggestion"
        return  

    candidates2  = edits1(word)
    candidates  = []
    for word in candidates1:
        if word in candidates2:
            candidates.append(word)
    if not candidates:
        print "suggestion: %s" % candidates1[0]
    else:
        print "suggestion: %s" % max(candidates)

def signal_handler(signal, frame):
    sys.exit(0)

if __name__ == '__main__':
    signal.signal(signal.SIGINT, signal_handler)
    while True:
        input = raw_input()
        correct(input)

更简单的方法

当然直接在程序里调用相关模块最简单了,有个叫做 PyEnchant 的库支持拼写检查,安装 PyEnchant 和 Enchant 后就可以直接在 Python 程序里 import 了:

>>> import enchant
>>> d = enchant.Dict("en_US")
>>> d.check("Hello")
True
>>> d.check("Helo")
False
>>> d.suggest("Helo")
['He lo', 'He-lo', 'Hello', 'Helot', 'Help', 'Halo', 'Hell', 'Held', 'Helm', 'Hero', "He'll"]
>>>

如何查看进程 IO 读写情况?

Linux Kernel 2.6.20 以上的内核支持进程 IO 统计,可以用类似 iotop 这样的工具来监测每个进程对 IO 操作的情况,就像用 top 来实时查看进程内存、CPU 等占用情况那样。但是对于 2.6.20 以下的 Linux 内核版本就没那么幸运了,根据 Stack Overflow 的这篇回帖 给出的方法,VPSee 写了一个简单的 Python 脚本用来在 linux kernel < 2.6.20 下打印进程 IO 状况。

Kernel < 2.6.20

这个脚本的想法很简单,把 dmesg 的结果重定向到一个文件后再解析出来,每隔1秒钟打印一次进程 IO 读写的统计信息,执行这个脚本需要 root:

#!/usr/bin/python
# Monitoring per-process disk I/O activity
# written by http://www.vpsee.com 

import sys, os, time, signal, re

class DiskIO:
    def __init__(self, pname=None, pid=None, reads=0, writes=0):
        self.pname = pname 
        self.pid = pid
        self.reads = 0
        self.writes = 0

def main():
    argc = len(sys.argv)
    if argc != 1:
        print "usage: ./iotop"
        sys.exit(0)

    if os.getuid() != 0:
        print "must be run as root"
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)
    os.system('echo 1 > /proc/sys/vm/block_dump')
    print "TASK              PID       READ      WRITE"
    while True:
        os.system('dmesg -c > /tmp/diskio.log')
        l = []  
        f = open('/tmp/diskio.log', 'r')
        line = f.readline()
        while line:
            m = re.match(\
                '^(\S+)\((\d+)\): (READ|WRITE) block (\d+) on (\S+)', line)
            if m != None:
                if not l:       
                    l.append(DiskIO(m.group(1), m.group(2)))
                    line = f.readline() 
                    continue            
                found = False   
                for item in l:  
                    if item.pid == m.group(2):
                        found = True            
                        if m.group(3) == "READ":
                            item.reads = item.reads + 1 
                        elif m.group(3) == "WRITE":
                            item.writes = item.writes + 1
                if not found:   
                    l.append(DiskIO(m.group(1), m.group(2)))
            line = f.readline()
        time.sleep(1)
        for item in l:
            print "%-10s %10s %10d %10d" % \
                (item.pname, item.pid, item.reads, item.writes)

def signal_handler(signal, frame):
    os.system('echo 0 > /proc/sys/vm/block_dump')
    sys.exit(0)

if __name__=="__main__":
    main()


继续阅读 »