5 min read

Python接口函数

本文于2020-10-10更新。 如发现问题或者有建议,欢迎提交 Issue

#df_register = source_register(userids)
df_register = pd.read_pickle('C:/Users/Python_bc/baichuan_model/df_register')
df_register = df_register[df_register['userid'] == userid]

1 翻译sql的心得

  • left join 其实不麻烦,全部join起来,再处理 select 的变量。
  • 详单表翻译完一张后,热情和习惯来了,继续搞。
  • 表的逻辑需要一点点update问清楚。

2 source_register函数定义

# 所有的数据源
@data_source_logs()
def source_register(userids):
    """
    数据源: register, 获取ac渠道信息
    """
    df = user_ac_register_channel(userids)
    df = _default_columns(df, ['device', 'insert_time', 'reg_chanel', 'userid'])
    df.columns = ['device', 'inserttime', 'regchanel', 'userid']
    df = _to_datetime(df, ['inserttime'])
    return df

reg_chanel看到这种真的想打人。

3 装饰器解释

这里,@data_source_logs()是啥意思?

所以@是装饰器, 他的作用是记录日志。

from utils import data_source_logs找到你了!

def data_source_logs():
    """
    数据源日志的装饰器
    """
    def real_decorator(raw_fun):
        source_name = raw_fun.__name__
        def wrapper(*args, **kwargs):
            logger.info('开始获取数据源: %s ...' % source_name)
            try:
                df = raw_fun(*args, **kwargs)
                logger.info('数据源: %s 获取完成.' % source_name)
            except Exception as e:
                logger.error('数据源: %s 获取失败. 失败原因: %s' % (source_name, traceback.format_exc()))
                raise e
            return df
        return wrapper
    return real_decorator

总之一句话, 就是为了跑日志,告诉哪段代码跑完了。

4 找接口信息

user_ac_register_channel先搞清这个函数。

在这里from model.baichuan.itf.user_ac_register_channel import user_ac_register_channel

# encoding: utf-8

# 从接口获取ac 渠道注册信息
# 接口文档: http://userac.ppdapi.com:8083/swagger-ui.html
# 董长龙
# 测试环境: 172.17.3.24  userac.ppdapi.com
# 陈建

#################  输入示例  #############
# 输入: df_userid,类型为 dataframe, 主要包含两个字段: userid, listingtime

出现了接口信息了! 如果你看完了文档,你就觉得这个信息写的很详细了,真的,这是一个好的程序员写的。

这是我唯一的底气。

问题来了,这里的config['url_ac_register_channel']是啥意思?

import yaml
import os
import logging.config
import traceback
from model.baichuan.conf.logging_conf import logging_config

# 项目路径
bash_path = os.path.abspath(os.path.join(__file__, '../../'))

# 配置文件路径
config_path = os.path.join(bash_path, 'conf')

# 配置文件内容
config = yaml.load(open(os.path.join(config_path, 'config.yaml')))

这里是查询__file__, '../../'向上第二个路径。 反正就是说,来看看config.yaml这个文件。 这里面包含了陈建写的所以的接口信息,找起来方便。 如果需要切换成测试环境的接口,config.yaml \(\to\) config-dev.yamlC:\Users\lijiaxiang\Desktop\秉慧python代码\baichuan\conf\config.yaml中, url_ac_register_channel: http://userac.ppdapi.com/v2/useracregisterchannel/%(userid)s?appid=%(appid)s 因此就找到了这个接口,发现两个标签信息。

5 搞host

要搞host,这个时候一个用来配置接口的,一个用来读接口的。

switchhost找一下,这是个软件,作用如下。

需要切换host来访问到一个特定IP地址,那这个软件将帮你节省很多时间,高效率开发。

什么是hosts文件? 简单的说,hosts文件是用于本地dns服务(相关主题:什么是DNS缓存,如何清除DNS缓存?)的,采用ip 域名的格式写在一个文本文件当中,Hosts是一个没有扩展名的系统文件,可以用记事本等工具打开,其作用就是将一些常用的网址域名与其对应的IP地址建立一个关联“数据库”,当用户在浏览器中输入一个需要登录的网址时,系统会首先自动从Hosts文件中寻找对应的IP地址,一旦找到,系统会立即打开对应网页,如果没有找到,则系统再会将网址提交DNS域名解析服务器进行IP地址的解析。

简单来说,有些网址登陆不上去,你需要配置一下,但是配置一下要来回切换,很麻烦,这个时候,就需要一个自动切换的软件,这就是SwitchHosts的功能之处,而且还可以自由切换,应该加个前缀Auto

安装好后,把接口信息粘贴上去。 ( SwtichHosts快速切换Host - CSDN博客 ) follow这里的思路, 然后拨动下左侧的按钮,变绿了,就是开始切换了。 这个时候就可以用浏览器访问网址了。

http://userac.ppdapi.com:8083/swagger-ui.html 这里的8083是测试环境,一般默认80,这也可以找例子。 config.yamlconfig-dev.yaml 里面,相似的,这个地方就是不一样,可以比较比较找找感觉,搞这些,内行看逻辑,我们外行看感觉吧。

因此总结一下,就是把 测试环境: 172.17.3.24 userac.ppdapi.com粘贴在SwitchHosts, 然后浏览器访问 http://userac.ppdapi.com:8083/swagger-ui.html 搞定!

然后阅读开发文档 首次接入,请先阅读 http://confluence.ppdai.com/pages/viewpage.action?pageId=10071880 这个开发文档很简单,主要包含三个信息

  • 访问链接
  • 接口host
  • 例子

就是说,如果搞不懂,下个postman,快递小哥在不动Python的情况下,也可以实现Python的能力。 这种能力就是input标签,最后返回字段。

6 postman

config.yaml 打开这个文档,这个文档记录了所有的接口。

#  desc: 获取 ppdai_audit..preAudit 表的数据
#  doc: http://confluence.ppdai.com/pages/viewpage.action?pageId=9382058
url_pre_audit: http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList

preaudit开始搞起来。

对这个文件查询,

因此打开postman,选择POST,粘贴网址, http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList , 打开Body, 选择raw 选择JSON格式, 输入我们想要的测试的用户。 applicationId这个就是权限,没有就撸不到数据,是陈建部门的代码。

请求参数

  • applicationId: 不能都为空
  • borrowerId: 不能都为空
  • listingId:不能都为空

然后在hostman中找到了结果。

{
    "result": 1,
    "resultMessage": "没有查询到满足条件的数据",
    "itemList": null
}

7 Python中执行

迅速找到preAudit的构建函数。

C:\Users\lijiaxiang\Desktop\秉慧python代码\baichuan\itf\preaudit.py

    headers = {"Content-type": "application/json", "Accept": "application/json"}
    timeout = 10
    data = {
        'applicationId': ,
    }

这些都不需要改,header不想解释,永远不会知道。 timeout延迟时间,容忍度。 applicationId陈建的番号。

    data = json.dumps(data)
    response = requests.post(url, data, headers=headers, timeout=timeout).text
    response = json.loads(response)

骚操作在这,等价于postman。 因为这是POST的格式,因此打法是json.dumps(data)data有啥呢?

    data = {
        'applicationId': ,
    }
    if borrowerid is not None:
        data['borrowerId'] = borrowerid
    if listingid is not None:
        data['listingId'] = listingid

别忘了,import json

import json
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = 'http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList'
data = {
    'applicationId': ,
    'borrowerId': 1,
    'listingId': 1111
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
print(response)

模拟的用户数据输入好了。

结果出来了,

{'result': 1, 'resultMessage': '没有查询到满足条件的数据', 'itemList': None}


8 写接口函数

这里是最重要的了,前面的重要只有5%,这里是95%,因此是在创新。

打开笔记,看看要写哪些接口。

  • ods.ppdai_new_invest_extend_paimoneyrecords: ppdai_new_invest_extend.paimoneyrecords
  • ods.zmxy_score: ppdai_user_third_data.zmxy_score
  • ods.mobileapps 宋昉
  • ods.ppdai_user_log_userupdateinfologs: ppdai_user_log.userupdateinfologs

先搞最烦的,宋昉和何岳的。

通过userid入库时间查询用户mobileApps信息的接口是: /rest/getAppsByUserIdAndTime

卧槽,这个怎么拿app数据啊,我到底线下表怎么搞来着? appcompany like '%tencent%' and appcompany like '%weixin%' 因此也是抓取的。 没关系,我们试一试。 5786718用一个userid来做例子。

172.17.4.82, readapp.ppdapi.com找好了。

所以还差一个调用的链接。

就是readapp.ppdapi.com + /rest/getAppsByUserIdAndTime, 看了下写的例子:

{
    "userId":346,
    "guId":"hgahahah",
    "startDate":"2015-06-01 00:00:00",
    "endDate":"2017-06-01 00:00:00"
}

注意啊,/rest/getAppsByUserIdAndTime前面的POST不要乱加。

现在尝试在Python里面实现。

"guId":"hgahahah"这个可以没有啊! 何岳跟我说,guID用工程名称。

import json
import pandas as pd
def mobileapps(userId=None):
    headers = {"Content-type": "application/json", "Accept": "application/json"}
    timeout = 10
    url = "http://readapp.ppdapi.com/rest/getAppsByUserIdAndTime"
    data = {
            "userId":userId,
            # "guId":guId, 不需要
            "startDate":"2017-01-01 00:00:00",
            "endDate":"2017-06-01 00:00:00"
        }
    data = json.dumps(data)
    response = requests.post(url, data, headers=headers, timeout=timeout).text
    response = json.loads(response)
    df = pd.DataFrame(response)
    df = df[['AppCompany','InsertTime','UserId']]
    if not df.empty:
        df.columns = map(lambda x: x.lower(), df.columns)
    else:
        all_columns=['appcompany', 'inserttime', 'userid']
        df = pd.DataFrame(columns=all_columns)
    return df

if __name__ == '__main__':
    a = mobileapps(userId=346)#, listingids=[100076, 105709, 105890, 106452])
    #a = preaudit(16981)
    print(a)

第一个终于写好了!!!

之后就撸吧!


9 ListingAutoFlowSysProcess

import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowSysProcess"
data = {
    "UserIds":[24739192,24739203],
    "Amounts":[8000] 
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
# df = pd.DataFrame(response)
response.items()
# pd.DataFrame(response.items())
# df = pd.DataFrame.from_dict(response)
# df.head()
# 文档: http://confluence.ppdai.com/pages/viewpage.action?pageId=10881075
# 测试环境: 172.17.3.9 loansd.ppdapi.com

#default_url = 'http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess'

首先配置好接口,然后写Python。

import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess"

起手定义好要用的包、headers、timeout和接入api。

定义好要取用的userid向量。

data = {
    "UserIds":[24739192,24739203]
}

因为response的结果是一个字典,字典的第一个value是字典,其他我们都不需要,因此提取第一个value,然后转pd.DataFrame,就friendly read了。

data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)['Result']

对了json.loads(response)['Result']list格式,机智的一笔。

全部代码如下:

import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess"
data = {
    "UserIds":[24739192,24739203]
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)['Result']
# type(response)
df = pd.DataFrame(response)
# response.items()
# pd.DataFrame(response.items())
# df = pd.DataFrame.from_dict(response)
df.head()

10 ods.zmxy_score

172.17.4.983rdreadapi.ppdapi.com 找到了!

dict通过json.dumps转化 就成了json

这个appid要是appid有误,只能找业务提供方了,陈建也没办法。

import json
import pandas as pd
import requests
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = "http://3rdreadapi.ppdapi.com/queryData"
data = {

    "appid": "",

    "userid": "50070214",

    "options": "zhimaquery",

    "version": "1.0.0",

    "async": "1",

    "atom": 0,

    "role": 0

}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
response
{'code': 0,
 'data': {'zhimaquery': {'bizNo': 'ZM201702153000000217600651746412',
   'id': 7667,
   'inserttime': '2018-01-02 11:21:17',
   'isactive': 1,
   'score': ,
   'updatetime': '2018-01-02 11:21:17',
   'userid': 50070214}},
 'message': 'success',
 'version': None}

11 sql问题

def get_commondaily(userid, engine=engine_cbd):
    mod = userid % 10
    sql = '''
    select 
        user_id, cmstr_reg_channl, cmstr_sec_lev, cmstr_idnm_gen, cmstr_idnm_age,
        cmstr_idnm_dst, cmstr_idnm_pro, cmstr_idnm_city, cmstr_pho_pro, 
        cmstr_pho_cit, cmstr_pho_cit_lev, cmstr_pho_opr, cmstr_edu_sef, 
        cmstr_edu_cer, cmstr_edu_typ, cmstr_edu_sch, cmstr_enr_cer, cmstr_enr_typ,
        cmstr_enr_sch, cmstr_mar, cmstr_occ, cmstr_occ_lif, 
        cmstr_pho_mod_lt, cmstr_pho_os_lt, cmstr_pho_os_ver_lt, cmstr_pho_net_lt,
        cmstr_app_reg_channl
    from ppdai_app_score_biz.common_user_daily000{}
    where user_id = {}
    '''.format(mod, userid)
    df = pd.read_sql(sql, engine)
    return df

这里% 10 表示连续扫描10个模。

12 其他

  • 另外,敏感信息都过滤了,code可以重新拿。
  • Appid,芝麻分。

12.1 dict \(\to\) pd.DataFrame

In [11]: pd.DataFrame(d.items())  # or list(d.items()) in python 3
Out[11]:
             0    1
0   2012-07-02  392
1   2012-07-06  392
2   2012-06-29  391
3   2012-06-28  391
...

In [12]: pd.DataFrame(d.items(), columns=['Date', 'DateValue'])
Out[12]:
          Date  DateValue
0   2012-07-02        392
1   2012-07-06        392
2   2012-06-29        391

我需要再搞几个接口函数,因为不是统一的,比如中台和其他的不一样。 然后修改下文档,太乱。 这个只是解释陈建的代码文档,并非要复盘,因此看完后,就可以copy陈建代码,自己写函数了。

12.2 ValueError: If using all scalar values, you must pass an index

一个key只有一个value的字典如果直接转化成数据框会报错。

pd.DataFrame.from_dict(response['data']['zhimaquery'], orient='index').T 加入orient='index'且转置.T

12.3 for loop

172.17.4.8loansd.ppdapi.com 是host。

df_2=pd.concat(a)为了merge两个pd.DataFrame合并。 type(a)的格式是list,所以可以merge。

12.4 %s的使用

str = 'I love %s and %s, he loves %s and %s.' 
str % ('ha','haha','hahaha','hahahaha')

%就是这么用,'I love ha and haha, he loves hahaha and hahahaha.'看句子。 哈哈真机智。 ( python about multiple %s in a string - Stack Overflow )

The d in %d stands for decimal. %d is for formatting numbers. %s is for formatting strings.

12.5 df.to_dict要具体情况具体分析

In [13]: df.to_dict('Records')[0]
Out[13]: {'a': 1, 'b': 2}