• 欢迎访问蜷缩的蜗牛博客 蜷缩的蜗牛
  • 微信搜索: 蜷缩的蜗牛 | 联系站长 kbsonlong@qq.com
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧
  • 本站已开通微信小程序,可以扫描底部二维码关注,或者搜索alongparty!!

Flower API


# Create your tests here.

#help : https://flower-docs-cn.readthedocs.io/zh/latest/api.html
#Celery Manage Flower

import requests
import json


class Celery_Flower(object):
    def __init__(self,url):
        self.__base_url = url .strip('/')

    def PostRequest(self,prefix,data=None):
        url = '{}/{}'.format(self.__base_url,prefix)
        print(url)
        if data:
            res = requests.post(url, data)
        else:
            res = requests.get(url)
        return res

    def Task_list(self):
        prefix = 'task/types'
        tasks = self.PostRequest(prefix)
        return tasks.json()['task-types']

    def Add_task(self,task_name,args=None):
        prefix = 'task/async-apply/{}'.format(task_name)
        data = json.dumps({"args":args.split(',')})
        res = self.PostRequest(prefix,data)
        return res

    def Task_result(self,taskid):
        prefix = 'task/result/{}'.format(taskid)
        res = self.PostRequest(prefix)
        return res

if __name__ == '__main__':
    cf = Celery_Flower(url='http://192.168.56.101:5555/api')
    # res = cf.Add_task('celery_tasks.tasks.Salt_User','19.168.56.101,zengshenglong')
    # taskid = res.json()['task-id']
    # results = cf.Task_result(taskid)
    # print(results.json())
    task_list = cf.Task_list()
    for task in task_list:
        print(task)

蜷缩的蜗牛 , 版权所有丨如未注明 , 均为原创丨 转载请注明Flower API
喜欢 (0)
[]
分享 (0)
蜷缩的蜗牛
关于作者:
IT运维工程师,熟练使用复制、粘贴技能;平时爱好打羽毛球,虽然很菜,也喜欢出去旅游!个人公众号:蜷缩的蜗牛,欢迎关注订阅~~网站小程序:alongparty,支持搜索博客内容,订阅专题,评论、点赞等等

您必须 登录 才能发表评论!