1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # encoding: utf-8
- import sys
- import pymysql.cursors
- import psycopg2
- import time
- reload(sys)
- sys.setdefaultencoding('utf-8')
- def mysql_connect(host, db, user, password):
- try:
- mysql_conn = pymysql.connect(host=host,
- user=user,
- password=password,
- db=db,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor)
- return mysql_conn
- except:
- print('连接mysql失败!')
- pass
- def postgres_connect(host, port, db, user, password):
- try:
- postgres_conn = psycopg2.connect(database=db, user=user, password=password, host=host, port=port)
- return postgres_conn
- except:
- print('连接Postgres失败!')
- pass
- if __name__ == '__main__':
- # 获取postgres连接
- postgres_conn = postgres_connect('127.0.0.1', '5432', 'nwaycc', 'postgres', 'Nway2017')
- # 获取mysql连接
- mysql_conn = mysql_connect('47.94.228.245', 'swdz_crm', 'dbuser', 'BXhKw6JRxMy9FTb2')
- # try:
- postgres_cur = postgres_conn.cursor()
- mysql_cur = mysql_conn.cursor()
- ip = '172.31.20.135'
- mysql_cur.execute(
- """select cdr_id from call_records where ip = %s order by id desc limit 1""",
- (ip)
- )
- last_id = mysql_cur.fetchone()
- last_id = last_id['cdr_id'] if last_id else 0
- # 获取要同步的通话详单
- postgres_cur.execute("""select caller,start_time,end_time,record_path,hangup_dispostion,call_id,id,term_status,intention,task_id from ai_cdr where id > %s order by id asc""",
- ([last_id]))
- call_records = postgres_cur.fetchall()
- for call_record in call_records:
- postgres_cur.execute("""select tk_name from ai_task where id = %s""",
- ([call_record[9]]))
- task_name = postgres_cur.fetchone()
- tk_id = call_record[9]
- task_name = task_name['tk_name'] if task_name else '任务' + str(tk_id)
- record_path = ip+':8088/recordings/'+call_record[3]
- mysql_cur.execute(
- """insert into call_records(phone, start_time, end_time, record_path, hangup_dispostion, call_id, cdr_id, term_status,intention, ip,tag) value (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
- (call_record[0], call_record[1], call_record[2], record_path,
- call_record[4], call_record[5], call_record[6], call_record[7], call_record[8], ip,task_name)
- )
- mysql_conn.commit()
- # 获取通话过程
- postgres_cur.execute(
- """select ai_speak, people_speak, call_id, voice_path, flow_time, node_name from ai_flow_content where call_id = %s""",
- ([call_record[5]]))
- call_records_processes = postgres_cur.fetchall()
- for call_records_process in call_records_processes:
- str1 = call_records_process[0]
- str2 = call_records_process[3]
- str1 = str1.replace('/opt/fsgui/nway-sys', ip+':8088')
- str2 = str2.replace('/opt/fsgui/nway-sys', ip+':8088')
- mysql_cur.execute(
- """insert into call_records_process(ai_speak, people_speak, call_id, voice_path, flow_time, node_name, ip) value (%s, %s, %s, %s, %s, %s, %s)""",
- (str1, call_records_process[1], call_records_process[2], str2,
- call_records_process[4], call_records_process[5], ip)
- )
- mysql_conn.commit()
- mysql_conn.close()
- postgres_conn.close()
- # except:
- # print "Error: insert data error"
- # pass
|