Передо мной стояла задача быстрой вставки большого количества JSON в Postgres на удаленном сервере. Обычное соединение к СУБД и использование "INSERT" нужной скорости не давало, даже с "executemany". Решением стал комплекс действий, состоящий из следующих этапов.
Первое - это компактификация пересылаемых данных (в моем случае это JSON). Для этого я использовал модуль dict_tools, который заменяет ключи в словаре с данными на уникальные и максимально короткие, возвращая словарь с заменами, который потом можно использовать для возвращения первоначальных имен.
from dict_tools import names_generator, names_replace
aliases = names_generator()
tweets = [{'text': 'Hello, world'}, {'text': 'Bye, chaos'}]
shorten, replaced = names_replace(tweets, aliases=aliases)
Кроме того, превращая словарь в JSON-строку использовал опции "separators", "encoding" и "ensure_ascii" - меньше пробелов и нормальные символы в UTF-8 вместо escape-последовательностей типа \u01010 для юникода:
def dumps(d):
return json.dumps(d, separators=(',',':'), encoding='utf-8', ensure_ascii=False)
Второе - это отправка сжатых данных на сервер, где расположен Postgres. Для этого с одной стороны работает скрипт, использующий модуль requests и отправляющий данные. В словаре с замененными именами ключи и значения меняются местами, чтобы произвести обратную замену на принимающей стороне. Здесь "localhost:8888" просто для примера.
import requests
data = {
'snames': dumps({v: k for k, v in replaced.items()}),
'tweets': dumps(shorten)
}
r = requests.post('http://localhost:8888', data=data)
print(r.text)
На другой стороне висит минисервис на Tornado, принимающий присылаемые данные, возвращающий оригинальные ключи в JSON, размещающий данные построчно в CSV-файле и вызывающий команду Postgres "COPY".
import tornado.ioloop
import tornado.web
import json
import csv
import os
import psycopg2
from dict_tools import names_replace
class MainHandler(tornado.web.RequestHandler):
def post(self, *args, **kwargs):
tweets = json.loads(self.get_argument('tweets'))
snames = json.loads(self.get_argument('snames'))
tweets, replaced_names = names_replace(tweets, repl=snames)
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dump.csv')
fieldnames = ['id', 'src']
with open(path, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for tweet in tweets:
writer.writerow({
'id': tweet['id'],
'src': json.dumps(tweet, separators=(',',':'), encoding='utf-8', ensure_ascii=False).encode('utf-8')
})
con = psycopg2.connect(host='localhost', port=5432, user='XXX', password='XXX', database='XXX')
cur = con.cursor()
cur.execute("COPY dump_table(id, src) FROM '{}' WITH CSV HEADER".format(path))
con.commit()
con.close()
self.write(u'Saved {} items'.format(len(tweets)))
if __name__ == "__main__":
application = tornado.web.Application([(r"/", MainHandler)])
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Такая схема у меня работает в несколько раз быстрее, чем вставка с удаленного сервера.