本文于2020-10-10更新。 如发现问题或者有建议,欢迎提交 Issue
#df_register = source_register(userids)
df_register = pd.read_pickle('C:/Users/Python_bc/baichuan_model/df_register')
df_register = df_register[df_register['userid'] == userid]
1 翻译sql的心得
- left join 其实不麻烦,全部join起来,再处理 select 的变量。
- 详单表翻译完一张后,热情和习惯来了,继续搞。
- 表的逻辑需要一点点update问清楚。
2 source_register
函数定义
# 所有的数据源
@data_source_logs()
def source_register(userids):
"""
数据源: register, 获取ac渠道信息
"""
df = user_ac_register_channel(userids)
df = _default_columns(df, ['device', 'insert_time', 'reg_chanel', 'userid'])
df.columns = ['device', 'inserttime', 'regchanel', 'userid']
df = _to_datetime(df, ['inserttime'])
return df
reg_chanel
看到这种真的想打人。
3 装饰器解释
这里,@data_source_logs()
是啥意思?
所以@
是装饰器, 他的作用是记录日志。
from utils import data_source_logs
找到你了!
def data_source_logs():
"""
数据源日志的装饰器
"""
def real_decorator(raw_fun):
source_name = raw_fun.__name__
def wrapper(*args, **kwargs):
logger.info('开始获取数据源: %s ...' % source_name)
try:
df = raw_fun(*args, **kwargs)
logger.info('数据源: %s 获取完成.' % source_name)
except Exception as e:
logger.error('数据源: %s 获取失败. 失败原因: %s' % (source_name, traceback.format_exc()))
raise e
return df
return wrapper
return real_decorator
总之一句话, 就是为了跑日志,告诉哪段代码跑完了。
4 找接口信息
user_ac_register_channel
先搞清这个函数。
在这里from model.baichuan.itf.user_ac_register_channel import user_ac_register_channel
。
# encoding: utf-8
# 从接口获取ac 渠道注册信息
# 接口文档: http://userac.ppdapi.com:8083/swagger-ui.html
# 董长龙
# 测试环境: 172.17.3.24 userac.ppdapi.com
# 陈建
################# 输入示例 #############
# 输入: df_userid,类型为 dataframe, 主要包含两个字段: userid, listingtime
出现了接口信息了! 如果你看完了文档,你就觉得这个信息写的很详细了,真的,这是一个好的程序员写的。
这是我唯一的底气。
问题来了,这里的config['url_ac_register_channel']
是啥意思?
import yaml
import os
import logging.config
import traceback
from model.baichuan.conf.logging_conf import logging_config
# 项目路径
bash_path = os.path.abspath(os.path.join(__file__, '../../'))
# 配置文件路径
config_path = os.path.join(bash_path, 'conf')
# 配置文件内容
config = yaml.load(open(os.path.join(config_path, 'config.yaml')))
这里是查询__file__, '../../'
向上第二个路径。
反正就是说,来看看config.yaml
这个文件。
这里面包含了陈建写的所以的接口信息,找起来方便。
如果需要切换成测试环境的接口,config.yaml
\(\to\) config-dev.yaml
在C:\Users\lijiaxiang\Desktop\秉慧python代码\baichuan\conf\config.yaml
中,
url_ac_register_channel: http://userac.ppdapi.com/v2/useracregisterchannel/%(userid)s?appid=%(appid)s
因此就找到了这个接口,发现两个标签信息。
5 搞host
要搞host,这个时候一个用来配置接口的,一个用来读接口的。
switchhost
找一下,这是个软件,作用如下。
需要切换host来访问到一个特定IP地址,那这个软件将帮你节省很多时间,高效率开发。
什么是hosts文件? 简单的说,hosts文件是用于本地dns服务(相关主题:什么是DNS缓存,如何清除DNS缓存?)的,采用ip 域名的格式写在一个文本文件当中,Hosts是一个没有扩展名的系统文件,可以用记事本等工具打开,其作用就是将一些常用的网址域名与其对应的IP地址建立一个关联“数据库”,当用户在浏览器中输入一个需要登录的网址时,系统会首先自动从Hosts文件中寻找对应的IP地址,一旦找到,系统会立即打开对应网页,如果没有找到,则系统再会将网址提交DNS域名解析服务器进行IP地址的解析。
简单来说,有些网址登陆不上去,你需要配置一下,但是配置一下要来回切换,很麻烦,这个时候,就需要一个自动切换的软件,这就是SwitchHosts
的功能之处,而且还可以自由切换,应该加个前缀Auto
。
安装好后,把接口信息粘贴上去。 ( SwtichHosts快速切换Host - CSDN博客 ) follow这里的思路, 然后拨动下左侧的按钮,变绿了,就是开始切换了。 这个时候就可以用浏览器访问网址了。
http://userac.ppdapi.com:8083/swagger-ui.html
这里的8083
是测试环境,一般默认80
,这也可以找例子。
config.yaml
和
config-dev.yaml
里面,相似的,这个地方就是不一样,可以比较比较找找感觉,搞这些,内行看逻辑,我们外行看感觉吧。
因此总结一下,就是把
测试环境: 172.17.3.24 userac.ppdapi.com
粘贴在SwitchHosts
,
然后浏览器访问
http://userac.ppdapi.com:8083/swagger-ui.html
搞定!
然后阅读开发文档
首次接入,请先阅读 http://confluence.ppdai.com/pages/viewpage.action?pageId=10071880
这个开发文档很简单,主要包含三个信息
- 访问链接
- 接口host
- 例子
就是说,如果搞不懂,下个postman,快递小哥在不动Python的情况下,也可以实现Python的能力。 这种能力就是input标签,最后返回字段。
6 postman
config.yaml
打开这个文档,这个文档记录了所有的接口。
# desc: 获取 ppdai_audit..preAudit 表的数据
# doc: http://confluence.ppdai.com/pages/viewpage.action?pageId=9382058
url_pre_audit: http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList
对preaudit
开始搞起来。
对这个文件查询,
因此打开postman,选择POST
,粘贴网址,
http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList
,
打开Body
,
选择raw
选择JSON
格式,
输入我们想要的测试的用户。
applicationId
这个就是权限,没有就撸不到数据,是陈建部门的代码。
请求参数
- applicationId: 不能都为空
- borrowerId: 不能都为空
- listingId:不能都为空
然后在hostman中找到了结果。
{
"result": 1,
"resultMessage": "没有查询到满足条件的数据",
"itemList": null
}
7 Python中执行
迅速找到preAudit
的构建函数。
C:\Users\lijiaxiang\Desktop\秉慧python代码\baichuan\itf\preaudit.py
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
data = {
'applicationId': ,
}
这些都不需要改,header不想解释,永远不会知道。
timeout延迟时间,容忍度。
applicationId
陈建的番号。
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
骚操作在这,等价于postman。
因为这是POST
的格式,因此打法是json.dumps(data)
,data
有啥呢?
data = {
'applicationId': ,
}
if borrowerid is not None:
data['borrowerId'] = borrowerid
if listingid is not None:
data['listingId'] = listingid
别忘了,import json
import json
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = 'http://queryapi.listings.ppdaicorp.com/listLoanService/getPreAuditList'
data = {
'applicationId': ,
'borrowerId': 1,
'listingId': 1111
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
print(response)
模拟的用户数据输入好了。
结果出来了,
{'result': 1, 'resultMessage': '没有查询到满足条件的数据', 'itemList': None}
8 写接口函数
这里是最重要的了,前面的重要只有5%,这里是95%,因此是在创新。
打开笔记,看看要写哪些接口。
ods.ppdai_new_invest_extend_paimoneyrecords
:ppdai_new_invest_extend.paimoneyrecords
ods.zmxy_score
:ppdai_user_third_data.zmxy_score
ods.mobileapps
宋昉ods.ppdai_user_log_userupdateinfologs
:ppdai_user_log.userupdateinfologs
先搞最烦的,宋昉和何岳的。
通过userid
和入库时间
查询用户mobileApps
信息的接口是:
/rest/getAppsByUserIdAndTime
卧槽,这个怎么拿app数据啊,我到底线下表怎么搞来着?
appcompany like '%tencent%' and appcompany like '%weixin%'
因此也是抓取的。
没关系,我们试一试。
5786718
用一个userid来做例子。
172.17.4.82
, readapp.ppdapi.com
找好了。
所以还差一个调用的链接。
就是readapp.ppdapi.com
+ /rest/getAppsByUserIdAndTime
,
看了下写的例子:
{
"userId":346,
"guId":"hgahahah",
"startDate":"2015-06-01 00:00:00",
"endDate":"2017-06-01 00:00:00"
}
注意啊,/rest/getAppsByUserIdAndTime
前面的POST不要乱加。
现在尝试在Python里面实现。
"guId":"hgahahah"
这个可以没有啊!
何岳跟我说,guID
用工程名称。
import json
import pandas as pd
def mobileapps(userId=None):
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = "http://readapp.ppdapi.com/rest/getAppsByUserIdAndTime"
data = {
"userId":userId,
# "guId":guId, 不需要
"startDate":"2017-01-01 00:00:00",
"endDate":"2017-06-01 00:00:00"
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
df = pd.DataFrame(response)
df = df[['AppCompany','InsertTime','UserId']]
if not df.empty:
df.columns = map(lambda x: x.lower(), df.columns)
else:
all_columns=['appcompany', 'inserttime', 'userid']
df = pd.DataFrame(columns=all_columns)
return df
if __name__ == '__main__':
a = mobileapps(userId=346)#, listingids=[100076, 105709, 105890, 106452])
#a = preaudit(16981)
print(a)
第一个终于写好了!!!
之后就撸吧!
9 ListingAutoFlowSysProcess
import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowSysProcess"
data = {
"UserIds":[24739192,24739203],
"Amounts":[8000]
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
# df = pd.DataFrame(response)
response.items()
# pd.DataFrame(response.items())
# df = pd.DataFrame.from_dict(response)
# df.head()
# 文档: http://confluence.ppdai.com/pages/viewpage.action?pageId=10881075
# 测试环境: 172.17.3.9 loansd.ppdapi.com
#default_url = 'http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess'
首先配置好接口,然后写Python。
import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess"
起手定义好要用的包、headers、timeout和接入api。
定义好要取用的userid向量。
data = {
"UserIds":[24739192,24739203]
}
因为response
的结果是一个字典,字典的第一个value是字典,其他我们都不需要,因此提取第一个value,然后转pd.DataFrame
,就friendly read了。
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)['Result']
对了json.loads(response)['Result']
是list
格式,机智的一笔。
全部代码如下:
import json
import pandas as pd
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 100
url = "http://loansd.ppdapi.com/Lightning/QueryService/QueryListingAutoFlowUserProcess"
data = {
"UserIds":[24739192,24739203]
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)['Result']
# type(response)
df = pd.DataFrame(response)
# response.items()
# pd.DataFrame(response.items())
# df = pd.DataFrame.from_dict(response)
df.head()
10 ods.zmxy_score
172.17.4.98
和3rdreadapi.ppdapi.com
找到了!
dict
通过json.dumps
转化
就成了json
。
这个appid
要是appid
有误,只能找业务提供方了,陈建也没办法。
import json
import pandas as pd
import requests
headers = {"Content-type": "application/json", "Accept": "application/json"}
timeout = 10
url = "http://3rdreadapi.ppdapi.com/queryData"
data = {
"appid": "",
"userid": "50070214",
"options": "zhimaquery",
"version": "1.0.0",
"async": "1",
"atom": 0,
"role": 0
}
data = json.dumps(data)
response = requests.post(url, data, headers=headers, timeout=timeout).text
response = json.loads(response)
response
{'code': 0,
'data': {'zhimaquery': {'bizNo': 'ZM201702153000000217600651746412',
'id': 7667,
'inserttime': '2018-01-02 11:21:17',
'isactive': 1,
'score': ,
'updatetime': '2018-01-02 11:21:17',
'userid': 50070214}},
'message': 'success',
'version': None}
11 sql问题
def get_commondaily(userid, engine=engine_cbd):
mod = userid % 10
sql = '''
select
user_id, cmstr_reg_channl, cmstr_sec_lev, cmstr_idnm_gen, cmstr_idnm_age,
cmstr_idnm_dst, cmstr_idnm_pro, cmstr_idnm_city, cmstr_pho_pro,
cmstr_pho_cit, cmstr_pho_cit_lev, cmstr_pho_opr, cmstr_edu_sef,
cmstr_edu_cer, cmstr_edu_typ, cmstr_edu_sch, cmstr_enr_cer, cmstr_enr_typ,
cmstr_enr_sch, cmstr_mar, cmstr_occ, cmstr_occ_lif,
cmstr_pho_mod_lt, cmstr_pho_os_lt, cmstr_pho_os_ver_lt, cmstr_pho_net_lt,
cmstr_app_reg_channl
from ppdai_app_score_biz.common_user_daily000{}
where user_id = {}
'''.format(mod, userid)
df = pd.read_sql(sql, engine)
return df
这里% 10
表示连续扫描10个模。
12 其他
- 另外,敏感信息都过滤了,code可以重新拿。
- Appid,芝麻分。
12.1 dict \(\to\) pd.DataFrame
In [11]: pd.DataFrame(d.items()) # or list(d.items()) in python 3
Out[11]:
0 1
0 2012-07-02 392
1 2012-07-06 392
2 2012-06-29 391
3 2012-06-28 391
...
In [12]: pd.DataFrame(d.items(), columns=['Date', 'DateValue'])
Out[12]:
Date DateValue
0 2012-07-02 392
1 2012-07-06 392
2 2012-06-29 391
我需要再搞几个接口函数,因为不是统一的,比如中台和其他的不一样。 然后修改下文档,太乱。 这个只是解释陈建的代码文档,并非要复盘,因此看完后,就可以copy陈建代码,自己写函数了。
12.2 ValueError: If using all scalar values, you must pass an index
一个key
只有一个value
的字典如果直接转化成数据框会报错。
pd.DataFrame.from_dict(response['data']['zhimaquery'], orient='index').T
加入orient='index'
且转置.T
。
12.3 for
loop
172.17.4.8
和loansd.ppdapi.com
是host。
df_2=pd.concat(a)
为了merge
两个pd.DataFrame
合并。
type(a)
的格式是list
,所以可以merge。
12.4 %s
的使用
str = 'I love %s and %s, he loves %s and %s.'
str % ('ha','haha','hahaha','hahahaha')
%
就是这么用,'I love ha and haha, he loves hahaha and hahahaha.'
看句子。
哈哈真机智。
(
python about multiple %s in a string - Stack Overflow
)
The d
in %d
stands for decimal. %d
is for formatting numbers.
%s
is for formatting strings.
12.5 df.to_dict
要具体情况具体分析
In [13]: df.to_dict('Records')[0]
Out[13]: {'a': 1, 'b': 2}