Python: 下載 youtube 上的台灣電視節目

Posted by TJ Wei on 星期日, 5月 15, 2011 with No comments
在坐月子中心,老婆想追一下之前連續劇的進度。月子中心裡的網路有限制,用 p2p 軟體不但佔用頻寬,而且不太可行的。雖然 cattail 在 android 上推出 台灣連續劇 app,但沒帶 hdmi 線,而且用我之前寫的 acer stream hdmi test,解析度不夠。雖說如此,既然影片已經在 youtube 上了,就有辦法看得到。
簡單看了一下,發現影片資料庫的網址。下載了 drama.db 後,用 sqlite database browser 檢視一下後,簡單寫個 python script 來生成 m3u8 檔。 現在的 VLC 已經能直接播放像是 http://youtube.com/watch?v=XXXXX 的 URL 。所以生成的 m3u8 也的卻能播放,不過由於網路速度太慢,播放斷斷續續的,無法盡如人意。所以乾脆寫個程式來下載 youtube 的影片。雖然 youtube-dl 很成熟,但用殺牛的大刀來切雞肉,有點太大材小用了,況且放入自己的程式碼中,還要花點時間讀程式碼。Youtube 的 HTML 碼改過不少,網路上一下也搜尋不到怎麼解析來源 flv 或者 mp4 的方式,所以乾脆就直接看 HTML source,配合上 regexp 來擷取影片 URL,也可以選擇自己想要的解析度。
雖然也可以配合 vlc 的 python binding 來播放,但是下載並且產生 m3u8 檔已經符合我的需求。
程式在 windows 7 下的 python 2.6 及 Ubuntu 的 python 2.7 測試過。在不同的機器下,除了一開始的幾個 path 外,也許檔名(drama_m3u8 和 fn)和 sys.argv[1]   的 encoding 也可能因為系統而不同也需要更改。
****
 2011-09-27補充
該資料庫已經不再繼續更新
*******

最後寫出的 python script 如下:

# -*- coding: utf8 -*-
import sqlite3
import re
import urllib
import sys
import time
import locale

DB_FILENAME=r'./drama.db'
OUT_DIR=ur'./'
PREF_FMTS=["35","34","5"] # youtube format: flv 480, flv 360, flv 240

def best_url(fmt_dict):
    k=filter(fmt_dict.has_key, PREF_FMTS)
    return fmt_dict[k[0]] if k else None

def try_to(f, n=1, message="do the work"):
    for i in range(n):
        try:
            return f()
        except:
            print "failed to %s (#%d)"%(message,i)
    return None

def get_fmts(v):
    url="http://youtube.com/watch?v=%s"%v
    read_url=lambda :urllib.urlopen(url).read()
    parse_fmt=lambda s: (s[0],urllib.unquote(s[1].decode('unicode_escape'))
                                     .replace(r'\/','/'))
    try:
        s=re.compile('"fmt_url_map": "([^"]*)"')\
            .search(try_to(read_url, 10, "read %s"%url))
        return dict(parse_fmt(x.split("|")) for x in s.group(1).split(","))
    except:
        print "failed to parse formats"
        return {}

def get_episodes(drama_name, ep_list=None, part_list=None):
    pbar=lambda p,k,t:"#"*int(p/2+0.5)+"."*(50-int(p/2+0.5))\
          +" %5.1f%% %6.1fKB/s %3d:%02d\r"%(p,k,t/60,t%60)
    def report():
        sec0=time.time()
        return lambda n, bs, size: sys.stdout.write(
                     pbar(min(n*bs*100.0/size,100.0),
                          n*bs/1024.0/max(1.0,time.time()-sec0),
                          int(time.time()-sec0)))
    conn=sqlite3.connect(DB_FILENAME)
    c=conn.cursor()
    query='''select pltitle, plvids from playlisttable
                                    where pltitle like ?
                                    order by plds'''
    c.execute(query, (u"%%%s%%"%drama_name,))
    enum_eps=((i+1,row) for i,row in enumerate(c) 
                        if not ep_list or i+1 in ep_list)
    for ep,row in enum_eps:
        print "[ep=%d]"%ep, row[0]
        drama_m3u8=OUT_DIR+ur'%s.m3u8'%row[0]
        vids=row[1].split(",")
        allparts=range(len(vids))
        vidnames=[row[0]+u" (%d).flv"%i for i in allparts]
        open(drama_m3u8,"w").write(u"\n".join(vidnames).encode("utf-8")+"\n")
        pl=set(allparts)&set(part_list) if part_list else allparts
        for part in pl:
            fn=OUT_DIR+vidnames[part]
            print fn
            url=best_url(get_fmts(vids[part]))
            if not url:
                continue
            if try_to(lambda :
              urllib.urlretrieve(url, fn, report()), 3, "get %s"%url):
                print "\nDone"
            else:
                print "\nFailed"
    conn.close()
    
if __name__ == "__main__":
    enc=locale.getpreferredencoding()
    try:
        drama_name=sys.argv[1].decode(enc) if len(sys.argv)>1 else u"犀利人妻"
        ep_list=map(int,sys.argv[2].split(",")) if len(sys.argv)>2 else None
        part_list=map(int,sys.argv[3].split(",")) if len(sys.argv)>3 else None
    except:
        print """Usage: tvshow_download showname [ep_list [part_list]]
Example: tvshow_download 犀利人妻 3,4,5  #download episode 3,4,5
         tvshow_download 犀利人妻        #donwload all episodes
         tvshow_download 犀利人妻 13 0,1 #download part 0,1 of episode 13
""".encode(enc)
    print "args:",drama_name, ep_list, part_list
    get_episodes(drama_name, ep_list, part_list)
Categories: ,