2 min read

Python接口函数-中台

本文于2020-10-10更新。 如发现问题或者有建议,欢迎提交 Issue

1 中台数据提取

# coding=utf8

import requests
import pandas as pd
import json
from model.baichuan.func.utils import config

import的包没什么变化。

  • desc:
  • doc:
  • url_zhongtai:

中台也有config的。

  • 测试环境的hosts: 172.17.4.29
  • readapi.user.ppdaicorp.com

1.1 class函数

class UserReadService:
    def __init__(self, url = config['url_zhongtai'],
                 data = defaultQueryRequestData,
                 headers=defaultQueryRequestHeaders,
                 contentStr = defaultContentStr,
                 timeout = 10
                 ):
        # 接口url
        self.url = url
        self.data = data
        self.headers = headers
        self.contentStr = contentStr
        self.timeout = timeout

这里class UserReadService表达一类函数。

class UserReadService:
    def __init__(self, url = config['url_zhongtai'],
                 data = defaultQueryRequestData,
                 headers=defaultQueryRequestHeaders,
                 contentStr = defaultContentStr,
                 timeout = 10
                 ):
        # 接口url
        self.url = url
        self.data = data
        self.headers = headers
        self.contentStr = contentStr
        self.timeout = timeout

这个地方的

  • defaultQueryRequestData
  • defaultQueryRequestHeaders
  • defaultContentStr

都是有一大串定义好的。

class MyClass(object):
     i = 123
     def __init__(self):
         self.i = 345

a = MyClass()
print a.i
345
print MyClass.i
123

以函数为例。

def getPageUserData(self,users, options, pageNum, pageSize=200):
    if len(users) == 0:
        return pd.DataFrame()
    contents = self.contentStr % (options, str(users)[1:-1], str(pageNum), str(pageSize))
    data = self.data % (options,contents)
    request_data = requests.post(self.url,data = data,headers = self.headers, timeout=self.timeout).text
    request_data = json.loads(request_data)
    if request_data['code'] == 0:
        response = request_data['data'][options]['data']
        pages = request_data['data'][options]['pageInfo']['pages']
    else:
        raise Exception(request_data['message'])
    res = []
    for ele in response:
        if len(ele)!=0:
            res.append(ele)
    df = pd.DataFrame.from_dict(res)
    if len(df) != 0:
        df.columns = map(lambda x: x.lower(),df.columns)
        df.columns = map(lambda x: x.lower(),df.columns)

    return df, pages

1.2 重点

这是最核心的函数 request_data = requests.post(self.url,data = data,headers = self.headers, timeout=self.timeout).text

经陈建提醒,这个接口函数,最重要的就是这句话。 request_data = requests.post(self.url,data = data,headers = self.headers, timeout=self.timeout).text其他都是在这个基础上搞的,不要跑偏了。 要是跟大家讲接口的东西,这个就是最重要的了!

1.3 举例

data = '''
'''
headers = {"Content-type":"application/json"}
print(data)

这里要字符串话,否则data就是dict格式了。

当然也可以使用dictjson的方式 用json.dumps实现。

data1 = json.dumps(data1)
type(data1)

格式就变成str了。

request_data = requests.post('http://readapi.user.ppdaicorp.com/queryData',data = data1,headers = headers, timeout=10).text
print(request_data)
request_data = json.loads(request_data)
print(request_data)
pd.DataFrame.from_dict(request_data)

就好了。

pd.DataFrame(pd.DataFrame.from_dict(request_data).iloc[0,1]['data'])的结果就是很好的结果。