pyppeteer 模拟登陆淘宝

pyppeteer 文档地址
https://miyakogi.github.io/pyppeteer/index.html

import asyncio
from pyppeteer import launch

async def main():

    #初始化浏览器
    browser = await launch({'headless':False,'autoClose':False,'args':['--no-sandbox'],})
    #新建页面
    page = await browser.newPage()
    #设置高度和宽度
    await page.setViewport({'width': 1200, 'height': 800})
    #登录界面
    url = "https://login.taobao.com/member/login.jhtml"
    #打开淘宝登录界面
    await page.goto(url)

    #设置 webdriver为False 如果不设置淘宝会禁止你登录
    await page.evaluate('''
            () =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }
    ''')

    #判断是否有密码登录按钮
    while not await page.querySelector('.quick-form .login-links a.forget-pwd'):
        pass

    #点击密码登录
    await page.click('.quick-form .login-links a.forget-pwd')

    #设置登录密码
    await page.type('#TPL_username_1','***')
    await page.type('#TPL_password_1','***')

    #判断是否有登录按钮
    while not await page.querySelector('#J_SubmitStatic'):
        pass

    #点击登录按钮
    await page.click('#J_SubmitStatic')

    #停止1s
    await asyncio.sleep(1)

    #如果页面刷新,没有跳转到个人页面,是因为要拖拽验证码登录,这里要重新设置 webdriver为false
    await page.evaluate('''
                () =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }
        ''')

    #判断是不是个人页面
    if(await page.querySelector('#TPL_password_1')):

        #重新设置密码
        await page.type('#TPL_password_1', '*/***')

        #判断是否有拖拽验证码
        while not await page.querySelector('#nc_1_n1z'):
            pass

        #有就获取节点
        el = await page.querySelector('#nc_1_n1z')
        #获取节点属性
        box = await el.boundingBox()
        #把鼠标移动到,节点上
        await page.hover('#nc_1_n1z')
        #点击
        await page.mouse.down()
        #拖拽x,y值,拖到最后
        await page.mouse.move(box['x']+1000,box['y'])
        #松开鼠标
        await page.mouse.up()

        #点击登录这个时候,应该会跳转到个人中心页面
        await page.click('#J_SubmitStatic')



if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

selenium ActionChains 拖动操作

最近想趴下京东的数据,发现京东的登录需要拖拽图片登录验证。

接着发现selenium有ActionChains类实现

首先导入

from selenium.webdriver.common.action_chains import ActionChains
#查找需要拖拽的元素
click_btn = driver.find_element_by_css_selector('.JDJRV-slide-inner.JDJRV-slide-btn')
#实例化 ActionChains对象
action = ActionChains(driver)
#click_and_hold 是点击拖拽 move_by_offset是拖拽的位置
action.click_and_hold(click_btn).move_by_offset(91, 0).release().perform()

python3 socket 上传文件

最近不想用ftp和lrzsz,就上网看下有没有关于socket上传教程。刚好看到有一篇python sokcet 上传图片教程,就试着编写下,结果挺简单的。

参考网址:
https://www.cnblogs.com/xiaokang01/p/9069048.html

首先是服务端:(不太清楚的可以看看 慕课网的
https://www.imooc.com/video/17674 这个教程哦)

import socket

sk = socket.socket()

port = ("0.0.0.0",8998)

sk.bind(port)

sk.listen(5)

conn,addr = sk.accept()

#获取传递过来的文件名称
filename = conn.recv(1024)

while True:
    #插进文件
    fp = open(filename, 'wb')
    print("start...")
    #循环获取传递过来的数据
    while True:
        #获取文件数据10240 (10240这个数越大获取数据越多 (- -) 其实不是这样的,但是现在先理解成这样)
        filedata = conn.recv(10240)
        #写入数据
        fp.write(filedata)
        #判断传递过来的数据是否存在,不存在跳出循环
        if not filedata:
            break
    #关闭文件
    fp.close()
    break
#关闭socket
sk.close()
print("上传成功哦 ,文件名称:", filename)

客户端:

import socket
import os
import time

#输入上传文件路径
file_name = input("请输入完整路径文件名称:")

#判断文件是否存在
if not os.path.exists(file_name):
    print("文件不存在!")
    exit()

#获取文件名称
name = os.path.basename(file_name)

sk = socket.socket()

port = ("127.0.0.1",8998)

sk.connect(port)

#发送文件名称
sk.send(name.encode())

#停止1秒 (可以把它注释掉看看哦)
time.sleep(1)

#获取文件大小
file_all_size = os.path.getsize(file_name)

#文件下载大小初始化
file_down_size = 0

#打开文件
fp = open(file_name, 'rb')

#输出的下载百分比是否相同
last_size = 0

while True:
    #循环读取文件
    filedata = fp.read(10240)

    #文件读取完退出循环
    if not filedata:
      break

    #发送文件数据
    sk.sendall(filedata)

    #获取文件下载大小
    file_down_size = file_down_size+len(filedata)
    #转为2个小数点
    size = float(format(file_down_size/file_all_size,'.2f'))
    #当前下载百分比
    down_size = int(size*100)

    #判断输出的下载百分比是否相同 相同赋值当前百分比,否则输出上传百分比
    if down_size==last_size:
        last_size = down_size
    else:
        print(str(down_size)+"%")
        last_size = down_size

#关闭文件
fp.close()

我们运行下:

先运行 服务端

在运行 客户端

切换到服务端

ok,完成!

appium 编写获取 虎扑社区信息脚本

首先我们打款appium桌面版

运行

接着下载
pip install Appium-Python-Client

先用aapt工具查看下虎扑包的 包名 和 首activity

aapt dump badging C:\Users\Administrator\Desktop\hupu.apk (你当前包名地址)

这个就是包名
这个就是首activity

接着我们连接 我们的安卓手机

我们用 adb devices 查看连接的手机名

然后我们要 adb kill-server 不然appium的adb就会被占用

好了 万事俱备只欠运行了

我们点一下搜索按钮
{
  "platformName": "Android", //系统 android or ios
  "platformVersion": "7.0", //系统 版本 7.0 5.1 4.0
  "deviceName": "BWN9X17615W02808", //手机名称
  "appPackage": "com.hupu.games", //打开的apk包名
  "appActivity": "com.hupu.games.activity.LaunchActivity" //apk首页
}

保存

然后 start session

如果成功的话会看到一些画面

我们在手机上点允许

点一下 appium的 刷新按钮

我们随便点击一个更多按钮 右边就是这个按钮的属性,有了这些属性我们就可以写python脚本了。

代码实现逻辑

  1. 首先 进来的时候 点击允许权限
  2. 进来后 点击 更多
  3. 再点击手机图标 然后进到登录页 点击 账号密码登录用send_key 来输入手机和密码 再点击登录
  4. 登录后 点击社区按钮 然后 循环 上拉加载 把显示在当前屏幕的 每一行的内容打印出来。

代码实现

from appium import webdriver

desired_caps = {}

desired_caps['platformName'] = 'Android'
desired_caps['platformVersion'] = '7.0'
desired_caps['deviceName'] = 'BWN9X17615W02808'
desired_caps['appPackage'] = 'com.hupu.games'
desired_caps['appActivity'] = 'com.hupu.games.activity.LaunchActivity'

driver = webdriver.Remote("http://localhost:4723/wd/hub",desired_caps)

首先引入appium 把设备信息和包名填上 接着运行 (端口就是appium的端口)

第二部

#电话权限 xpath
phone_quanxian = driver.find_element_by_xpath("/hierarchy/android.widget.FrameLayout/android.widget.FrameLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.LinearLayout/android.widget.LinearLayout/android.widget.LinearLayout/android.widget.Button[2]")
phone_quanxian.click()

#存储权限
save_quanxian = driver.find_element_by_id("com.android.packageinstaller:id/permission_allow_button")
save_quanxian.click()

这两端代码就是点击 权限

第三步

#更多按钮
genduan_btn = driver.find_element_by_id("com.hupu.games:id/btn_mytab")
genduan_btn.click()

第四步

#登录手机按钮
denlv_phone_btn = driver.find_element_by_id("com.hupu.games:id/mobile_login_new")
denlv_phone_btn.click()

#使用密码登录
click_pass_login = driver.find_element_by_id("com.hupu.games:id/bt_accout_login").click()

#输入手机号
input_phone_number = driver.find_element_by_id("com.hupu.games:id/username_text").send_keys("**************")

#输入密码
input_pass_number = driver.find_element_by_id("com.hupu.games:id/password_text").send_keys("****************")

#登录
login = driver.find_element_by_id("com.hupu.games:id/btn_account_submit").click()

#社区按钮
sequ_btn = driver.find_element_by_id("com.hupu.games:id/btn_bbs").click()

第五步

#查看社区信息
for i in range(1000):
view_group = driver.find_element_by_class_name("android.support.v7.widget.RecyclerView")
#获取当前可见的所有列表
view_layouts = view_group.find_elements_by_class_name("android.widget.LinearLayout")
for view_layout in view_layouts:
#判断此列表是否是 主列表
if(view_layout.get_attribute("resourceId")=="com.hupu.games:id/hot_replies_main"):
#获取当前主列表,下的全部textView
text_views = view_layout.find_elements_by_class_name("android.widget.TextView")
for text_view in text_views:
#判断当前textView 文字是否是 内容
if(text_view.get_attribute("resourceId")=="com.hupu.games:id/tv_content"):
print(text_view.text)
#向上滚动
height = int(driver.get_window_size()['height'] - 200)
driver.swipe(0, height, 0,height/3)

好了我们运行整个程序

python 使用myqr 生成动态二维码

myqr 是一个可以生成动态二维码的 python模块

github:
https://github.com/sylnsfar/qrcode/blob/master/README-cn.md

下载 pip install myqr

简单使用

from MyQR import myqr

myqr.run(words=”https://www.huxierong.xyz”,colorized=True,picture=”nick.gif”,level=”L”)

生成如下图片

picture 就是要覆盖的图片

colorrized 是否填充色彩

level 生成图片的质量 L M Q H 从左到右 依次增加

下面我们来实战下吧

我们首先docker pull python 拉一个python镜像下来

docker run -it -d -p 5000:5000 -p 5001:5001 python 运行此命令

进入 容器里面

apt-get update

pip install flask

pip install myqr

apt-get install vim

运行命令 也可以 写一个 dockerFile来一次性完成

新建 web/

新建 uploads

目录

进入web目录

放入一张gif图

vim run.py

from flask import Flask,request
import hashlib
from MyQR import myqr
def md5(str):
    m = hashlib.md5()
    m.update(str.encode('utf-8'))
    return m.hexdigest()

def getUrl():
    return request.args.get('url')

app = Flask(__name__)

@app.route('/')
def hello_world():
    if getUrl():
        myqr.run(words=getUrl(),colorized=True,level="L",picture="test.gif",save_name="/uploads/static/images/"+md5(getUrl())+".gif")
        return "http://xxxxxxxx:8800/static/images/"+md5(getUrl())+".gif"
    return "Hello World"

if __name__ == '__main__':
    app.run(host='0.0.0.0')

进入uploads目录

新建static 进入static 新建images目录

进入uploads目录

vim run.py

from flask import Flask,request

app = Flask(__name__)

if __name__ == '__main__':
    app.run(host='0.0.0.0',port="5001")

进入宿主机

配置nginx 让它代理5001端口


upstream dockerserver {
    server 172.17.0.1:5001;
}
server {
        listen       8800;
        server_name localhost;
        location / {
                proxy_pass http://dockerserver;
        }
}

新建qr.php

<?php
function geturl($url){
        #$headerArray =array("Content-type:application/json;","Accept:application/json");
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE);
        curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, FALSE);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        #curl_setopt($url,CURLOPT_HTTPHEADER,$headerArray);
        $output = curl_exec($ch);
        curl_close($ch);
        #$output = json_decode($output,true);
        return $output;
}

$path = geturl("http://172.17.0.1:5000?url=".htmlspecialchars($_GET['url']));
echo "<img src='{$path}' />";

运行

一个小项目完成了。当然还有好多细节要改哦。

python selenium 谷歌浏览器后台运行

from selenium import webdriver
driver = webdriver.Chrome()
driver.get("http://www.baidu.com")
print(driver.title)

这段代码运行会调起浏览器


加上

option = webdriver.ChromeOptions()
option.add_argument('headless')

这个配置后

from selenium import webdriver
option = webdriver.ChromeOptions()
option.add_argument('headless')
driver = webdriver.Chrome(chrome_options=option)
driver.get("http://www.baidu.com")
print(driver.title)

控制台直接打印

python 抓取b站 热门专辑

目录结构

需要 aiohttp pymongo

查看页面内容

发现有接口的,那就容易多了

先根据 返回的json格式写一个model类 bilbil.py

class bilbil:

    __category = "";#类型

    __title = "";#标题

    __summary = "";#描述

    __banner_url = "";#图片地址

    __score = "";#评分

    __ctime = "";#创建时间

    __type = "";# 昨天3 前天4 周榜2 月榜1

    __url = ""#详情地址

    def __init__(self):
        pass

    def setUrl(self,name):
        self.__url = name

    def getUrl(self):
        return self.__url

    def setType(self,name):
        self.__type = name

    def getType(self):
        return self.__type

    def setCategory(self,name):
        self.__category = name

    def getCategory(self):
        return self.__category

    def setTitle(self, name):
        self.__title = name

    def getTitle(self):
        return self.__title

    def setSummary(self, name):
        self.__summary = name

    def getSummary(self):
        return self.__summary

    def setBannerUrl(self, name):
        self.__banner_url = name

    def getBannerUrl(self):
        return self.__banner_url

    def setScore(self, name):
        self.__score = name

    def getScore(self):
        return self.__score

    def setCtime(self, name):
        self.__ctime = name

    def getCtime(self):
        return self.__ctime

    def getString(self):
        str = "标题:{0},创建日期:{1},星级:{2},图片:{3},描述:{4},类型:{5}".format(self.getTitle(),self.getCtime(),self.getScore(),self.getBannerUrl(),self.getSummary(),self.getCategory())
        return str

然后封装一个mongodb类 mongodbUtils.py

from pymongo import MongoClient
class mongodbUtils:
__client = ""
__db = ""
__collecion = ""

def
__init__(self):
self.__client = MongoClient('mongodb://127.0.0.1:8827')
self.__db = self.__client['bilbil_special']
pass

def
add_collect(self,name):
self.__collecion = self.__db[name]

def insert(self,jsonName):
return self.__collecion.insert_one(jsonName)

def insert_many(self,arrJsonName):
return self.__collecion.insert_many(arrJsonName)

def del_collection(self,query):
return self.__collecion.delete_many(query)

def update_collection(self,query,values,upsert=False):
return self.__collecion.update_many(query,values,upsert=upsert)

def getCollection(self):
return self.__collecion

def getClient(self):
return self.__client

接着编写test.py文件

import aiohttp,asyncio
from connect_network.model.bilbil import bilbil
from connect_network.handle.bilbil_handle import bilbil_handle

read_url = "https://www.bilibili.com/read/cv"
bilbil_name = bilbil()
bilbil_handle = bilbil_handle()

async def fetch_async(url,i):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return (await resp.json(),i)
pass
tasks = []
for i in range(4):
i = i+1
str_url = "https://api.bilibili.com/x/article/rank/list?cid={0}&jsonp=jsonp".format(i)
tasks.append(fetch_async(str_url,i))


event_loop = asyncio.get_event_loop()
result = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

for (val,i) in result:
for list in result[i-1][0]['data']:
try:
category = list['categories'][0]['name']
bilbil_name.setSummary(list['summary'])
bilbil_name.setCtime(list['ctime'])
bilbil_name.setBannerUrl(list['banner_url'])
bilbil_name.setScore(list['score'])
bilbil_name.setTitle(list['title'])
bilbil_name.setCategory(category)
bilbil_name.setUrl(read_url+str(list['id']))
bilbil_name.setType(result[i-1][1])
bilbil_handle.handle(bilbil_name)
except Exception as e:
print(e)
pass

然后编写一个入库的类 bilbil_handle.py

from connect_network.utils.mongodbUtils import mongodbUtils
import time
class bilbil_handle:
__model = ""
__mogdb = ""
def __init__(self,model=None):
self.__mogdb = mongodbUtils()
self.__mogdb.add_collect("tb_article")
pass

def handle(self,model):
jsonData = {
"title":model.getTitle(),
"category":model.getCategory(),
"summary":model.getSummary(),
"banner_url": model.getBannerUrl(),
"score": model.getScore(),
"ctime": model.getCtime(),
"type": model.getType(),
"url":model.getUrl(),
"create_time":int(time.time())
}

self.__mogdb.insert(jsonData)

然后运行test.py 文件

python test.py

mongo 查看是不是添加了内容

python换脸

#!/usr/bin/python

# Copyright (c) 2015 Matthew Earl
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
#     The above copyright notice and this permission notice shall be included
#     in all copies or substantial portions of the Software.
#
#     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
#     OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#     MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
#     NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#     DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#     OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
#     USE OR OTHER DEALINGS IN THE SOFTWARE.

"""
This is the code behind the Switching Eds blog post:
    http://matthewearl.github.io/2015/07/28/switching-eds-with-python/
See the above for an explanation of the code below.
To run the script you'll need to install dlib (http://dlib.net) including its
Python bindings, and OpenCV. You'll also need to obtain the trained model from
sourceforge:
    http://sourceforge.net/projects/dclib/files/dlib/v18.10/shape_predictor_68_face_landmarks.dat.bz2
Unzip with `bunzip2` and change `PREDICTOR_PATH` to refer to this file. The
script is run like so:
    ./faceswap.py <head image> <face image>
If successful, a file `output.jpg` will be produced with the facial features
from `<head image>` replaced with the facial features from `<face image>`.
"""

import cv2
import dlib
import numpy

import sys

PREDICTOR_PATH = "shape_predictor_68_face_landmarks.dat"
SCALE_FACTOR = 1
FEATHER_AMOUNT = 11

FACE_POINTS = list(range(17, 68))
MOUTH_POINTS = list(range(48, 61))
RIGHT_BROW_POINTS = list(range(17, 22))
LEFT_BROW_POINTS = list(range(22, 27))
RIGHT_EYE_POINTS = list(range(36, 42))
LEFT_EYE_POINTS = list(range(42, 48))
NOSE_POINTS = list(range(27, 35))
JAW_POINTS = list(range(0, 17))

# Points used to line up the images.
ALIGN_POINTS = (LEFT_BROW_POINTS + RIGHT_EYE_POINTS + LEFT_EYE_POINTS +
                RIGHT_BROW_POINTS + NOSE_POINTS + MOUTH_POINTS)

# Points from the second image to overlay on the first. The convex hull of each
# element will be overlaid.
OVERLAY_POINTS = [
    LEFT_EYE_POINTS + RIGHT_EYE_POINTS + LEFT_BROW_POINTS + RIGHT_BROW_POINTS,
    NOSE_POINTS + MOUTH_POINTS,
]

# Amount of blur to use during colour correction, as a fraction of the
# pupillary distance.
COLOUR_CORRECT_BLUR_FRAC = 0.6

detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(PREDICTOR_PATH)


class TooManyFaces(Exception):
    pass


class NoFaces(Exception):
    pass


def get_landmarks(im):
    rects = detector(im, 1)

    if len(rects) > 1:
        raise TooManyFaces
    if len(rects) == 0:
        raise NoFaces

    return numpy.matrix([[p.x, p.y] for p in predictor(im, rects[0]).parts()])


def annotate_landmarks(im, landmarks):
    im = im.copy()
    for idx, point in enumerate(landmarks):
        pos = (point[0, 0], point[0, 1])
        cv2.putText(im, str(idx), pos,
                    fontFace=cv2.FONT_HERSHEY_SCRIPT_SIMPLEX,
                    fontScale=0.4,
                    color=(0, 0, 255))
        cv2.circle(im, pos, 3, color=(0, 255, 255))
    return im


def draw_convex_hull(im, points, color):
    points = cv2.convexHull(points)
    cv2.fillConvexPoly(im, points, color=color)


def get_face_mask(im, landmarks):
    im = numpy.zeros(im.shape[:2], dtype=numpy.float64)

    for group in OVERLAY_POINTS:
        draw_convex_hull(im,
                         landmarks[group],
                         color=1)

    im = numpy.array([im, im, im]).transpose((1, 2, 0))

    im = (cv2.GaussianBlur(im, (FEATHER_AMOUNT, FEATHER_AMOUNT), 0) > 0) * 1.0
    im = cv2.GaussianBlur(im, (FEATHER_AMOUNT, FEATHER_AMOUNT), 0)

    return im


def transformation_from_points(points1, points2):
    """
    Return an affine transformation [s * R | T] such that:
        sum ||s*R*p1,i + T - p2,i||^2
    is minimized.
    """
    # Solve the procrustes problem by subtracting centroids, scaling by the
    # standard deviation, and then using the SVD to calculate the rotation. See
    # the following for more details:
    #   https://en.wikipedia.org/wiki/Orthogonal_Procrustes_problem

    points1 = points1.astype(numpy.float64)
    points2 = points2.astype(numpy.float64)

    c1 = numpy.mean(points1, axis=0)
    c2 = numpy.mean(points2, axis=0)
    points1 -= c1
    points2 -= c2

    s1 = numpy.std(points1)
    s2 = numpy.std(points2)
    points1 /= s1
    points2 /= s2

    U, S, Vt = numpy.linalg.svd(points1.T * points2)

    # The R we seek is in fact the transpose of the one given by U * Vt. This
    # is because the above formulation assumes the matrix goes on the right
    # (with row vectors) where as our solution requires the matrix to be on the
    # left (with column vectors).
    R = (U * Vt).T

    return numpy.vstack([numpy.hstack(((s2 / s1) * R,
                                       c2.T - (s2 / s1) * R * c1.T)),
                         numpy.matrix([0., 0., 1.])])


def read_im_and_landmarks(fname):
    im = cv2.imread(fname, cv2.IMREAD_COLOR)
    im = cv2.resize(im, (im.shape[1] * SCALE_FACTOR,
                         im.shape[0] * SCALE_FACTOR))
    s = get_landmarks(im)

    return im, s


def warp_im(im, M, dshape):
    output_im = numpy.zeros(dshape, dtype=im.dtype)
    cv2.warpAffine(im,
                   M[:2],
                   (dshape[1], dshape[0]),
                   dst=output_im,
                   borderMode=cv2.BORDER_TRANSPARENT,
                   flags=cv2.WARP_INVERSE_MAP)
    return output_im


def correct_colours(im1, im2, landmarks1):
    blur_amount = COLOUR_CORRECT_BLUR_FRAC * numpy.linalg.norm(
        numpy.mean(landmarks1[LEFT_EYE_POINTS], axis=0) -
        numpy.mean(landmarks1[RIGHT_EYE_POINTS], axis=0))
    blur_amount = int(blur_amount)
    if blur_amount % 2 == 0:
        blur_amount += 1
    im1_blur = cv2.GaussianBlur(im1, (blur_amount, blur_amount), 0)
    im2_blur = cv2.GaussianBlur(im2, (blur_amount, blur_amount), 0)

    # Avoid divide-by-zero errors.
    im2_blur += (128 * (im2_blur <= 1.0)).astype(im2_blur.dtype)

    return (im2.astype(numpy.float64) * im1_blur.astype(numpy.float64) /
            im2_blur.astype(numpy.float64))


im1, landmarks1 = read_im_and_landmarks('image/1.jpg')
im2, landmarks2 = read_im_and_landmarks('image/11.jpg')

M = transformation_from_points(landmarks1[ALIGN_POINTS],
                               landmarks2[ALIGN_POINTS])

mask = get_face_mask(im2, landmarks2)
warped_mask = warp_im(mask, M, im1.shape)
combined_mask = numpy.max([get_face_mask(im1, landmarks1), warped_mask],
                          axis=0)

warped_im2 = warp_im(im2, M, im1.shape)
warped_corrected_im2 = correct_colours(im1, warped_im2, landmarks1)

output_im = im1 * (1.0 - combined_mask) + warped_corrected_im2 * combined_mask

cv2.imwrite('output.jpg', output_im)

异步抓取网页 python aiohttp

    import asyncio
    import aiohttp
    
    async def get(url):
        try:
            async with aiohttp.ClientSession() as session:
                response = await session.get(url)
                result = await response.text()
                await session.close()
                return result
        except Exception as e:
            print("Fail:"+url)
            await session.close()
            return 0
    
    async def request(url):
        print('开始地址', url)
        res = await get(url)
        print('返回数据:', res)
    urls = [
        'http://csdn.com',
        'http://dev.dcloud.net.cn',
        'http://yigongla.com'
    ]
    tasks = []
    for url in urls:
        #把函数添加到task列表中
        tasks.append(request(url))
    
    try:
        loop = asyncio.get_event_loop()
        #asyncio.wait 接受一个task列表
        loop.run_until_complete(asyncio.wait(tasks))
    except Exception as e:
        print('fail')