2015-07-17

Быстрая переброска данных в удаленный Postgres

Передо мной стояла задача быстрой вставки большого количества JSON в Postgres на удаленном сервере. Обычное соединение к СУБД и использование "INSERT" нужной скорости не давало, даже с "executemany". Решением стал комплекс действий, состоящий из следующих этапов.

Первое - это компактификация пересылаемых данных (в моем случае это JSON). Для этого я использовал модуль dict_tools, который заменяет ключи в словаре с данными на уникальные и максимально короткие, возвращая словарь с заменами, который потом можно использовать для возвращения первоначальных имен.


from dict_tools import names_generator, names_replace
aliases = names_generator()
tweets = [{'text': 'Hello, world'}, {'text': 'Bye, chaos'}]
shorten, replaced = names_replace(tweets, aliases=aliases)

Кроме того, превращая словарь в JSON-строку использовал опции "separators", "encoding" и "ensure_ascii" - меньше пробелов и нормальные символы в UTF-8 вместо escape-последовательностей типа \u01010 для юникода:


def dumps(d):
    return json.dumps(d, separators=(',',':'), encoding='utf-8', ensure_ascii=False)

Второе - это отправка сжатых данных на сервер, где расположен Postgres. Для этого с одной стороны работает скрипт, использующий модуль requests и отправляющий данные. В словаре с замененными именами ключи и значения меняются местами, чтобы произвести обратную замену на принимающей стороне. Здесь "localhost:8888" просто для примера.


import requests
data = {
    'snames': dumps({v: k for k, v in replaced.items()}),
    'tweets': dumps(shorten)
}
r = requests.post('http://localhost:8888', data=data)
print(r.text)

На другой стороне висит минисервис на Tornado, принимающий присылаемые данные, возвращающий оригинальные ключи в JSON, размещающий данные построчно в CSV-файле и вызывающий команду Postgres "COPY".


import tornado.ioloop
import tornado.web
import json
import csv
import os
import psycopg2
from dict_tools import names_replace

class MainHandler(tornado.web.RequestHandler):
    def post(self, *args, **kwargs):

        tweets = json.loads(self.get_argument('tweets'))
        snames = json.loads(self.get_argument('snames'))
        tweets, replaced_names =  names_replace(tweets, repl=snames)

        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dump.csv')

        fieldnames = ['id', 'src']
        with open(path, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            for tweet in tweets:
                writer.writerow({
                    'id': tweet['id'],
                    'src': json.dumps(tweet, separators=(',',':'), encoding='utf-8', ensure_ascii=False).encode('utf-8')
                })

        con = psycopg2.connect(host='localhost', port=5432, user='XXX', password='XXX', database='XXX')
        cur = con.cursor()
        cur.execute("COPY dump_table(id, src) FROM '{}' WITH CSV HEADER".format(path))
        con.commit()
        con.close()

        self.write(u'Saved {} items'.format(len(tweets)))

if __name__ == "__main__":
    application = tornado.web.Application([(r"/", MainHandler)])
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Такая схема у меня работает в несколько раз быстрее, чем вставка с удаленного сервера.

Комментариев нет: