# encoding: utf-8 import sys import pymysql.cursors import psycopg2 import time reload(sys) sys.setdefaultencoding('utf-8') def mysql_connect(host, db, user, password): try: mysql_conn = pymysql.connect(host=host, user=user, password=password, db=db, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) return mysql_conn except: print('连接mysql失败!') pass def postgres_connect(host, port, db, user, password): try: postgres_conn = psycopg2.connect(database=db, user=user, password=password, host=host, port=port) return postgres_conn except: print('连接Postgres失败!') pass if __name__ == '__main__': # 获取postgres连接 postgres_conn = postgres_connect('127.0.0.1', '5432', 'nwaycc', 'postgres', 'Nway2017') # 获取mysql连接 mysql_conn = mysql_connect('47.94.228.245', 'swdz_crm', 'dbuser', 'BXhKw6JRxMy9FTb2') # try: postgres_cur = postgres_conn.cursor() mysql_cur = mysql_conn.cursor() ip = '172.31.20.135' mysql_cur.execute( """select cdr_id from call_records where ip = %s order by id desc limit 1""", (ip) ) last_id = mysql_cur.fetchone() last_id = last_id['cdr_id'] if last_id else 0 # 获取要同步的通话详单 postgres_cur.execute("""select caller,start_time,end_time,record_path,hangup_dispostion,call_id,id,term_status,intention,task_id from ai_cdr where id > %s order by id asc""", ([last_id])) call_records = postgres_cur.fetchall() for call_record in call_records: postgres_cur.execute("""select tk_name from ai_task where id = %s""", ([call_record[9]])) task_name = postgres_cur.fetchone() tk_id = call_record[9] task_name = task_name['tk_name'] if task_name else '任务' + str(tk_id) record_path = ip+':8088/recordings/'+call_record[3] mysql_cur.execute( """insert into call_records(phone, start_time, end_time, record_path, hangup_dispostion, call_id, cdr_id, term_status,intention, ip,tag) value (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", (call_record[0], call_record[1], call_record[2], record_path, call_record[4], call_record[5], call_record[6], call_record[7], call_record[8], ip,task_name) ) mysql_conn.commit() # 获取通话过程 postgres_cur.execute( """select ai_speak, people_speak, call_id, voice_path, flow_time, node_name from ai_flow_content where call_id = %s""", ([call_record[5]])) call_records_processes = postgres_cur.fetchall() for call_records_process in call_records_processes: str1 = call_records_process[0] str2 = call_records_process[3] str1 = str1.replace('/opt/fsgui/nway-sys', ip+':8088') str2 = str2.replace('/opt/fsgui/nway-sys', ip+':8088') mysql_cur.execute( """insert into call_records_process(ai_speak, people_speak, call_id, voice_path, flow_time, node_name, ip) value (%s, %s, %s, %s, %s, %s, %s)""", (str1, call_records_process[1], call_records_process[2], str2, call_records_process[4], call_records_process[5], ip) ) mysql_conn.commit() mysql_conn.close() postgres_conn.close() # except: # print "Error: insert data error" # pass