wesley 6 years ago
parent
commit
5f64118805
2 changed files with 176 additions and 0 deletions
  1. 88 0
      pyshell/mysql_sync_psql.py
  2. 88 0
      pyshell/psql_sync_mysql.py

+ 88 - 0
pyshell/mysql_sync_psql.py

xqd
@@ -0,0 +1,88 @@
+# encoding: utf-8
+import sys
+import pymysql.cursors
+import psycopg2
+import time
+
+reload(sys)
+sys.setdefaultencoding('utf-8')
+
+
+def mysql_connect(host, db, user, password):
+try:
+mysql_conn = pymysql.connect(host=host,
+user=user,
+password=password,
+db=db,
+charset='utf8mb4',
+cursorclass=pymysql.cursors.DictCursor)
+return mysql_conn
+except:
+print('连接mysql失败!')
+pass
+
+
+def postgres_connect(host, port, db, user, password):
+try:
+postgres_conn = psycopg2.connect(database=db, user=user, password=password, host=host, port=port)
+return postgres_conn
+except:
+print('连接Postgres失败!')
+pass
+
+
+if __name__ == '__main__':
+# 获取postgres连接
+postgres_conn = postgres_connect('127.0.0.1', '5432', 'nwaycc', 'postgres', 'Nway2017')
+
+# 获取mysql连接
+mysql_conn = mysql_connect('47.94.228.245', 'swdz_crm', 'dbuser', 'BXhKw6JRxMy9FTb2')
+
+# try:
+# 从mysql获取需要导入的电话号码
+mysql_cur = mysql_conn.cursor()
+mysql_sql_select = "SELECT phone,id from call_list WHERE sync = 0"
+mysql_cur.execute(mysql_sql_select)
+nums = mysql_cur.fetchall()
+
+postgres_cur = postgres_conn.cursor()
+group_name = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+# 生成号码组
+if nums:
+psql_insert_group = "INSERT INTO ai_number_group(group_name, own_id) VALUES ('" + group_name + "', 1) RETURNING id"
+postgres_cur.execute(psql_insert_group)
+postgres_conn.commit()
+group_id = postgres_cur.fetchone()[0]
+
+# 插入号码
+for num in nums:
+# 检查号码是否已经导入
+psql_select_num = "SELECT dst_number from ai_numbers WHERE dst_number = '" + num['phone'] + "' "
+postgres_cur.execute(psql_select_num)
+rows = postgres_cur.fetchall()
+
+# 更新mysql中该号码状态
+mysql_sql_update = "UPDATE call_list SET sync=1 WHERE id= %s"
+mysql_cur.execute(mysql_sql_update % (num['id']))
+mysql_conn.commit()
+
+if (rows):
+continue
+else:
+# 插入号码到ai_numbers
+psql_insert_num = "INSERT INTO ai_numbers(dst_number) VALUES ('" + num['phone'] + "') RETURNING id"
+postgres_cur.execute(psql_insert_num)
+postgres_conn.commit()
+num_id = postgres_cur.fetchone()[0]
+
+# 将导入的号码添加到号码组
+psql_insert_group_map = "INSERT INTO ai_number_group_map(group_id, number_id) VALUES ('" + str(
+group_id) + "', '" + str(num_id) + "')"
+postgres_cur.execute(psql_insert_group_map)
+postgres_conn.commit()
+
+mysql_conn.close()
+postgres_conn.close()
+# except:
+#     print "Error: insert data error"
+#     pass

+ 88 - 0
pyshell/psql_sync_mysql.py

xqd
@@ -0,0 +1,88 @@
+# encoding: utf-8
+import sys
+import pymysql.cursors
+import psycopg2
+import time
+
+reload(sys)
+sys.setdefaultencoding('utf-8')
+
+
+def mysql_connect(host, db, user, password):
+try:
+mysql_conn = pymysql.connect(host=host,
+user=user,
+password=password,
+db=db,
+charset='utf8mb4',
+cursorclass=pymysql.cursors.DictCursor)
+return mysql_conn
+except:
+print('连接mysql失败!')
+pass
+
+
+def postgres_connect(host, port, db, user, password):
+try:
+postgres_conn = psycopg2.connect(database=db, user=user, password=password, host=host, port=port)
+return postgres_conn
+except:
+print('连接Postgres失败!')
+pass
+
+
+if __name__ == '__main__':
+# 获取postgres连接
+postgres_conn = postgres_connect('127.0.0.1', '5432', 'nwaycc', 'postgres', 'Nway2017')
+
+# 获取mysql连接
+mysql_conn = mysql_connect('47.94.228.245', 'swdz_crm', 'dbuser', 'BXhKw6JRxMy9FTb2')
+
+# try:
+postgres_cur = postgres_conn.cursor()
+mysql_cur = mysql_conn.cursor()
+ip = '172.31.20.135'
+
+mysql_cur.execute(
+"""select cdr_id from call_records  where ip = %s order by id desc limit 1""",
+(ip)
+)
+last_id = mysql_cur.fetchone()
+last_id = last_id['cdr_id'] if last_id else 0
+# 获取要同步的通话详单
+postgres_cur.execute("""select caller,start_time,end_time,record_path,hangup_dispostion,call_id,id,term_status,intention from ai_cdr  where id > %s order by id asc""",
+([last_id]))
+call_records = postgres_cur.fetchall()
+
+for call_record in call_records:
+record_path = ip+':8088/recordings/'+call_record[3]
+mysql_cur.execute(
+"""insert into  call_records(phone, start_time, end_time, record_path, hangup_dispostion, call_id, cdr_id, term_status,intention, ip) value (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
+(call_record[0], call_record[1], call_record[2], record_path,
+call_record[4], call_record[5], call_record[6], call_record[7], call_record[8], ip)
+)
+mysql_conn.commit()
+
+# 获取通话过程
+postgres_cur.execute(
+"""select ai_speak, people_speak, call_id, voice_path, flow_time, node_name from ai_flow_content  where call_id =  %s""",
+([call_record[5]]))
+call_records_processes = postgres_cur.fetchall()
+for call_records_process in call_records_processes:
+str1 = call_records_process[0]
+str2 = call_records_process[3]
+str1 = str1.replace('/opt/fsgui/nway-sys', ip+':8088')
+str2 = str2.replace('/opt/fsgui/nway-sys', ip+':8088')
+mysql_cur.execute(
+"""insert into  call_records_process(ai_speak, people_speak, call_id, voice_path, flow_time, node_name, ip) value (%s, %s, %s, %s, %s, %s, %s)""",
+(str1, call_records_process[1], call_records_process[2], str2,
+call_records_process[4], call_records_process[5], ip)
+)
+mysql_conn.commit()
+
+
+mysql_conn.close()
+postgres_conn.close()
+# except:
+#     print "Error: insert data error"
+#     pass