最近写了较多的 Python 脚本,将最近自己写的脚本进行一个总结,其中有些是 Python 独有的,有些是所有程序设计中共有的:
__str__
函数,用户打印输出(如果不实现 __str__
,会调用 __repr__
),如果对象放到 collection 中之后,需要实现 __repr__
函数,用于打印整个 collection 的时候,直观显示上述总结肯定有片面的地方,也有不全的地方,欢迎指出
附上一份 Python2.7 代码(将一些私有的东西进行了修改)
# -*- coding:utf-8 -*- from sqlalchemy import create_engine import logging from logging.config import fileConfig import requests import Clinet # 私有的模块 fileConfig("logging_config.ini") logger = logging.getLogger("killduplicatedjob") #配置可以单独放到一个模块中 DB_USER = "xxxxxxx" DB_PASSWORD = "xxxxxxxx" DB_PORT = 111111 DB_HOST_PORT = "xxxxxxxxxx" DB_DATA_BASE = "xxxxxxxxxxx" REST_API_URL = "http://sample.com" engine = create_engine("mysql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST_PORT, DB_PORT, DB_DATA_BASE)) # 这个 class 是为了在函数间传递时,不需要使用方了解属性的具体顺序而写的,也可以放到一个单独的模块中 class DuplicatedJobs(object): def __init__(self, app_id, app_name, user): self.app_id = app_id self.app_name = app_name self.user = user def __repr__(self): return '[appid:%s, app_name:%s, user:%s]' % (self.app_id, self.app_name, self.user) def find_duplicated_jobs(): logger.info("starting find duplicated jobs") (running_apps, app_name_to_user) = get_all_running_jobs() all_apps_on_yarn = get_apps_from_yarn_with_queue(get_resource_queue()) duplicated_jobs = [] for app in all_apps_on_yarn: (app_id, app_name) = app if app_id not in running_apps: if not app_name.startswith("test"): logger.info("find a duplicated job, prefixed_name[%s] with appid[%s]" % (app_name, app_id)) user = app_name_to_user[app_name] duplicated_jobs.append(DuplicatedJobs(app_id, app_name, user)) else: logger.info("Job[%s] is a test job, would not kill it" % app_name) logger.info("Find duplicated jobs [%s]" % duplicated_jobs) return duplicated_jobs def get_apps_from_yarn_with_queue(queue): param = {"queue": queue} r = requests.get(REST_API_URL, params=param) apps_on_yarn = [] try: jobs = r.json().get("apps") app_list = jobs.get("app", []) for app in app_list: app_id = app.get("id") name = app.get("name") apps_on_yarn.append((app_id, name)) except Exception as e: #Exception 最好进行单独的分开,针对每一种 Exception 进行不同的处理 logger.error("Get apps from Yarn Error, message[%s]" % e.message) logger.info("Fetch all apps from Yarn [%s]" % apps_on_yarn) return apps_on_yarn def get_all_running_jobs(): job_infos = get_result_from_mysql("select * from xxxx where xx=yy") app_ids = [] app_name_to_user = {} for (topology_id, topology_name) in job_infos: status_set = get_result_from_mysql("select * from xxxx where xx=yy") application_id = status_set[0][0] if "" != application_id: configed_resource_queue = get_result_from_mysql( "select * from xxxx where xx=yy") app_ids.append(application_id) app_name_to_user[topology_name] = configed_resource_queue[0][0].split(".")[1] logger.info("All running jobs appids[%s] topology_name2user[%s]" % (app_ids, app_name_to_user)) return app_ids, app_name_to_user def kill_duplicated_jobs(duplicated_jobs): for job in duplicated_jobs: app_id = job.app_id app_name = job.app_name user = job.user logger.info("try to kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user)) try: Client.kill_job(app_id, user) logger.info("Job[%s] with appid[%s] for user[%s] has been killed" % (app_name, app_id, user)) except Exception as e: logger.error("Can't kill job[%s] with appid[%s] for user[%s]" % (app_name, app_id, user)) def get_result_from_mysql(sql): a = engine.execute(sql) return a.fetchall() # 因为下面的资源可能发生变化,而且可能包含一些具体的逻辑,因此单独抽取出来,独立成一个函数 def get_resource_queue(): return "xxxxxxxxxxxxx" if __name__ == "__main__": kill_duplicated_jobs(find_duplicated_jobs())
其中 logger 配置文件如下(对于 Python 的 logger,官方文档写的非常好,建议读一次,并且实践一次)
[loggers] keys=root, simpleLogger [handlers] keys=consoleHandler, logger_handler [formatters] keys=formatter [logger_root] level=WARN handlers=consoleHandler [logger_simpleLogger] level=INFO handlers=logger_handler propagate=0 qualname=killduplicatedjob [handler_consoleHandler] class=StreamHandler level=WARN formatter=formatter args=(sys.stdout,) [handler_logger_handler] class=logging.handlers.RotatingFileHandler level=INFO formatter=formatter args=("kill_duplicated_streaming.log", "a", 52428800, 3,) [formatter_formatter] format=%(asctime)s %(name)-12s %(levelname)-5s %(message)s