django订阅gerrit事件流数据(django 消息推送)

网友投稿 317 2022-08-23


django订阅gerrit事件流数据(django 消息推送)

总体思路:

1、自定义django命令行功能(参考:​​django.core.management import BaseCommand, CommandErrorfrom subscribe.gerrit.Factory import CreatePatchSetFactory, ChangeAbandoneFactory, ChangeMergeFactory, ChangeRestoreFactory, TopicChangeFactory class Command(BaseCommand): help = "subscribe gerrit stream-events:" def add_arguments(self, parser): parser.add_argument('name', help="要监听哪个事件流\nall:全部\npatchset-created:创建patchset时\nchange-merged:合入patchset时\nchange-abandoned:废弃patchset时\nchange-restored:恢复patchset时\ntopic-changed:Topic更改时") # parser.add_argument('-s', '--subscribe', help="可选参数, -s是简写, --subscribe是全拼") def handle(self, *args, **options): # 1.get param g_event = options['name'] support_events = ["patchset-created", "change-merged", "change-abandoned", "change-restored", "topic-changed"] # 2.check if param supported if g_event not in support_events: raise CommandError("args wrong, support are: %s" % support_events) if g_event == "all": cpfactory = CreatePatchSetFactory() cpfactory.get_stream_event() cabfactory = ChangeAbandoneFactory() cabfactory.get_stream_event() cmfactory = ChangeMergeFactory() cmfactory.get_stream_event() crfactory = ChangeRestoreFactory() crfactory.get_stream_event() tpfactory = TopicChangeFactory() tpfactory.get_stream_event() elif g_event == "patchset-created": cpfactory = CreatePatchSetFactory() cpfactory.get_stream_event() elif g_event == "change-merged": cmfactory = ChangeMergeFactory() cmfactory.get_stream_event() elif g_event == "change-abandoned": cabfactory = ChangeAbandoneFactory() cabfactory.get_stream_event() elif g_event == "change-restored": crfactory = ChangeRestoreFactory() crfactory.get_stream_event() else: tpfactory = TopicChangeFactory() tpfactory.get_stream_event()

2、处理各个类型事件流数据逻辑

from cmback.settings import GERRIT_HOSTNAME, GERRIT_URL, GERRIT_PORT, G_USERNAME, G_PASSWORDfrom gerrit import GerritClientfrom cmapp.models import Gitfrom cmapp.serializers import GitSerializerfrom abc import ABCMeta, abstractmethodfrom django.db import close_old_connectionsimport sysimport paramikoimport osimport loggingimport jsonimport datetimeimport re class GerritFactory(object): """ subscribe gerrit stream-events, save to local db real time for show use: "change-abandoned" "change-merged" "change-restored" "topic-changed" "patchset-created" unused: "assignee-changed" "hashtags-changed" "ref-updated" "reviewer-added" "reviewer-deleted" "comment-added" "project-created" "change-deleted" """ __metaclass__ = ABCMeta def __init__(self): self.hostname = GERRIT_HOSTNAME self.gerriturl = GERRIT_URL self.port = GERRIT_PORT self.username = G_USERNAME self.passwd = G_PASSWORD self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.logger = logging.getLogger("subscribe") @abstractmethod def get_stream_event(self): pass def connectGerrit(self): try: files = os.listdir("./subscribe/gerrit/") for file in files: if file.endswith("rsa"): self.client.connect(hostname=self.hostname, port=self.port, username=self.username, pkey=paramiko.RSAKey(filename="./subscribe/gerrit/%s" % file)) self.channel = self.client.get_transport().open_session(timeout=5) self.logger.info("connect to gerrit ok") break except Exception as e: self.logger.error("connect to gerrit failed, %s" % e) sys.exit(1) def getIssues(self, data): tmplist = re.findall("Issue: (.*)?", data) if len(tmplist) == 1 and len(tmplist[0]) >16: return tmplist[0].split(" ") else : return tmplist def getChangeLines(self, changeid): Gclient = GerritClient(base_url=self.gerriturl, username=self.username, password=self.passwd, ssl_verify=False) res = Gclient.changes.get(changeid).get_revision("current").files.poll() change_line = 0 for value in res: if value.get('path') == "/COMMIT_MSG": continue insert = value.get("lines_inserted") if value.get("lines_inserted") is not None else 0 delete = value.get("lines_deleted") if value.get("lines_deleted") is not None else 0 change_line += max(insert, delete) return change_line class CreatePatchSetFactory(GerritFactory): def get_stream_event(self): # 1.connect self.connectGerrit() # 2.listen try: self.channel.exec_command("gerrit stream-events -s patchset-created") while True: if self.channel.exit_status_ready(): break streamdata = self.channel.recv(1024000) data = json.loads(streamdata) self.logger.info("patchset-created: \n%s" % data) changeid = data['change']['id'] numberid = data['change']['number'] new_revision = data['patchSet']['revision'] repo = data['change']['project'] branch = data['change']['branch'] parent = "" if len(data['patchSet']['parents']) == 0 else data['patchSet']['parents'][0] subject = data['change']['subject'] topic = data['change'].get("topic", None) status = data['change']['status'] commit_message = data['change']['commitMessage'] issuelist = self.getIssues(commit_message) issue_list = json.dumps(issuelist) author = data['change']['owner']['username'] commit_date = datetime.datetime.fromtimestamp(data['change']['createdOn']).strftime("%Y-%m-%d %H:%M:%S") update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S") change_line = self.getChangeLines(changeid) # 1.check numberid exist? close_old_connections() numstatus = Git.objects.filter(numberid=numberid).exists() if numstatus: # update upobj = Git.objects.get(numberid=numberid) upobj.new_revision = new_revision upobj.subject = subject upobj.update_date = update_date upobj.commit_message = commit_message upobj.status = status upobj.issue_list = issue_list upobj.change_line = change_line upobj.save() self.logger.info("patchset-created-db: %s %s update ok" % (numberid, changeid)) else: # create new_data = { "changeid": changeid, "numberid": numberid, "new_revision": new_revision, "repo": repo, "branch": branch, "parent": parent, "topic": topic, "subject": subject, "old_subject": "xxxxxx", "status": status, "commiter": "xxxxxx", "reviewer": "xxxxxx", "update_date": update_date, "issue_list": issue_list, "commit_message": commit_message, "author": author, "commit_date": commit_date, "change_line": change_line, "submit_date": None, "submitter": None, "update_msg": None } pc_serial = GitSerializer(data=new_data) if pc_serial.is_valid(): pc_serial.save() self.logger.info("patchset-created-db: %s %s create ok" % (numberid, changeid)) else: self.logger.error("patchset-created-db: %s %s create failed %s" % (numberid, changeid, pc_serial.errors)) self.channel.close() except Exception as e: self.logger.warning("listen stream event except: %s ... reconnecting" % e) class ChangeAbandoneFactory(GerritFactory): def get_stream_event(self): # 1.connect self.connectGerrit() # 2.listen try: self.channel.exec_command("gerrit stream-events -s change-abandoned") while True: if self.channel.exit_status_ready(): break streamdata = self.channel.recv(1024000) data = json.loads(streamdata) self.logger.info("change-abandoned: \n%s" % data) changeid = data['change']['id'] numberid = data['change']['number'] update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S") status = data['change']['status'] close_old_connections() numstatus = Git.objects.filter(numberid=numberid).exists() if numstatus: # update upobj = Git.objects.get(numberid=numberid) upobj.status = status upobj.update_date = update_date upobj.save() self.logger.info("change-abandoned-db: %s %s update ok" % (numberid, changeid)) else: self.logger.error("change-abandoned-db: %s not exist in db" % numberid) except Exception as e: self.logger.warning("listen stream event except: %s ... reconnecting" % e) class ChangeMergeFactory(GerritFactory): def get_stream_event(self): # 1.connect self.connectGerrit() # 2.listen try: self.channel.exec_command("gerrit stream-events -s change-merged") while True: if self.channel.exit_status_ready(): break streamdata = self.channel.recv(1024000) data = json.loads(streamdata) self.logger.info("change-merged: \n%s" % data) changeid = data['change']['id'] numberid = data['change']['number'] update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S") status = data['change']['status'] submitter = data['submitter']['username'] submit_date = update_date close_old_connections() numstatus = Git.objects.filter(numberid=numberid).exists() if numstatus: # update upobj = Git.objects.get(numberid=numberid) upobj.status = status upobj.update_date = update_date upobj.submitter = submitter upobj.submit_date = submit_date upobj.save() self.logger.info("change-merged-db: %s %s update ok" % (numberid, changeid)) else: self.logger.error("change-merged-db: %s not exist in db" % numberid) except Exception as e: self.logger.warning("listen stream event except: %s ... reconnecting" % e) class ChangeRestoreFactory(GerritFactory): def get_stream_event(self): # 1.connect self.connectGerrit() # 2.listen try: self.channel.exec_command("gerrit stream-events -s change-restored") while True: if self.channel.exit_status_ready(): break streamdata = self.channel.recv(1024000) data = json.loads(streamdata) self.logger.info("change-restored: \n%s" % data) changeid = data['change']['id'] numberid = data['change']['number'] update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S") status = data['change']['status'] close_old_connections() numstatus = Git.objects.filter(numberid=numberid).exists() if numstatus: # update upobj = Git.objects.get(numberid=numberid) upobj.status = status upobj.update_date = update_date upobj.save() self.logger.info("change-restored-db: %s %s update ok" % (numberid, changeid)) else: self.logger.error("change-restored: %s not exist in db" % numberid) except Exception as e: self.logger.warning("listen stream event except: %s ... reconnecting" % e) class TopicChangeFactory(GerritFactory): def get_stream_event(self): # 1.connect self.connectGerrit() # 2.listen try: self.channel.exec_command("gerrit stream-events -s topic-changed") while True: if self.channel.exit_status_ready(): break streamdata = self.channel.recv(1024000) data = json.loads(streamdata) self.logger.info("topic-change: \n%s" % data) numberid = data['change']['number'] update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S") topic = data['change'].get("topic", None) close_old_connections() numstatus = Git.objects.filter(numberid=numberid).exists() if numstatus: # update upobj = Git.objects.get(numberid=numberid) upobj.topic = topic upobj.update_date = update_date upobj.save() self.logger.info("topic-change-db: %s update ok" % numberid) else: self.logger.error("topic-change: %s not exist in db" % numberid) self.channel.close() except Exception as e: # reconnect self.logger.warning("listen stream event except: %s ... reconnecting" % e)

3、启动服务

# 查看命令参数python manage.py subscribe -h # 启动单个服务python manage.py subscribe "patchset-created" # 或者启动所有服务python manage.py subscribe all


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springcloud如何使用Feign后台内部传递MultipartFile
下一篇:字符编码(字符编码在线转换)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~