2015-07-17

Быстрая переброска данных в удаленный Postgres

Передо мной стояла задача быстрой вставки большого количества JSON в Postgres на удаленном сервере. Обычное соединение к СУБД и использование "INSERT" нужной скорости не давало, даже с "executemany". Решением стал комплекс действий, состоящий из следующих этапов.

Первое - это компактификация пересылаемых данных (в моем случае это JSON). Для этого я использовал модуль dict_tools, который заменяет ключи в словаре с данными на уникальные и максимально короткие, возвращая словарь с заменами, который потом можно использовать для возвращения первоначальных имен.


from dict_tools import names_generator, names_replace
aliases = names_generator()
tweets = [{'text': 'Hello, world'}, {'text': 'Bye, chaos'}]
shorten, replaced = names_replace(tweets, aliases=aliases)

Кроме того, превращая словарь в JSON-строку использовал опции "separators", "encoding" и "ensure_ascii" - меньше пробелов и нормальные символы в UTF-8 вместо escape-последовательностей типа \u01010 для юникода:


def dumps(d):
    return json.dumps(d, separators=(',',':'), encoding='utf-8', ensure_ascii=False)

Второе - это отправка сжатых данных на сервер, где расположен Postgres. Для этого с одной стороны работает скрипт, использующий модуль requests и отправляющий данные. В словаре с замененными именами ключи и значения меняются местами, чтобы произвести обратную замену на принимающей стороне. Здесь "localhost:8888" просто для примера.


import requests
data = {
    'snames': dumps({v: k for k, v in replaced.items()}),
    'tweets': dumps(shorten)
}
r = requests.post('http://localhost:8888', data=data)
print(r.text)

На другой стороне висит минисервис на Tornado, принимающий присылаемые данные, возвращающий оригинальные ключи в JSON, размещающий данные построчно в CSV-файле и вызывающий команду Postgres "COPY".


import tornado.ioloop
import tornado.web
import json
import csv
import os
import psycopg2
from dict_tools import names_replace

class MainHandler(tornado.web.RequestHandler):
    def post(self, *args, **kwargs):

        tweets = json.loads(self.get_argument('tweets'))
        snames = json.loads(self.get_argument('snames'))
        tweets, replaced_names =  names_replace(tweets, repl=snames)

        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dump.csv')

        fieldnames = ['id', 'src']
        with open(path, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            for tweet in tweets:
                writer.writerow({
                    'id': tweet['id'],
                    'src': json.dumps(tweet, separators=(',',':'), encoding='utf-8', ensure_ascii=False).encode('utf-8')
                })

        con = psycopg2.connect(host='localhost', port=5432, user='XXX', password='XXX', database='XXX')
        cur = con.cursor()
        cur.execute("COPY dump_table(id, src) FROM '{}' WITH CSV HEADER".format(path))
        con.commit()
        con.close()

        self.write(u'Saved {} items'.format(len(tweets)))

if __name__ == "__main__":
    application = tornado.web.Application([(r"/", MainHandler)])
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Такая схема у меня работает в несколько раз быстрее, чем вставка с удаленного сервера.

2015-07-15

Postgres + Python = Awesome

Чем больше работаю с Postgres, тем больше радует меня эта СУБД. Шикарная работа с JSON, позволяющая хранить структуры разнотипных данных в полях JSON и JSONB, и даже вешать на них индексы для быстрого поиска внутри них - находка для архивирования твитов и им подобных. Отличный набор функций для обработки текста, включая regexp. Типы-массивы (тоже с индексами). Переменных-таблиц нет, но их с успехом заменяют переменные-массивы или временные таблицы. Вот некоторые выражения, приводящие меня в щенячий восторг:


    -- Новая таблица по образцу выборки
    CREATE TABLE new_table AS SELECT field1, field2 FROM old_table LIMIT 10;

    -- Временная самоликвидирующаяся таблица
    CREATE TEMP TABLE IF NOT EXISTS new_table(id INT, data JSON) ON COMMIT DROP;

    -- Одно и то-же, во втором случае используется массив
    SELECT * FROM old_table WHERE id IN (1,2,3);
    SELECT * FROM old_table WHERE id = ANY('{1,2,3}');

    -- Создание массива в процессе группировки
    SELECT array_agg(text_id) FROM old_table WHERE id IN (1,2,3);

    -- JSON-массив JSON-объектов
    SELECT json_agg(t.jobj) FROM (
        SELECT json_build_object('text_id', id, 'record_id', _id) jobj
        FROM old_table
    ) t;

    -- Cоотношение твитов с вложенными чувствами и черствых:
    -- на выходе что-то вроде "{ "false" : 9115, "true" : 166 }"
    SELECT json_object_agg(t.ps, t.c) FROM (
        SELECT src#>>'{possibly_sensitive}' ps, COUNT(_id) c
        FROM tweet_archive
        WHERE src#>>'{possibly_sensitive}' = ANY('{false,true}') -- а тут массив с текстами
        GROUP BY src#>>'{possibly_sensitive}'
    ) t;

Для работы с Postgres используется Python и стандартная библиотека json. Для компактности и корректной работы с не-ASCII символами внутри JSON-полей Postgres при упаковке твитов используются такие параметры:

    json.dumps(tweet, separators=(',',':'), encoding='utf-8', ensure_ascii=False)

Еще одна возможность, которая может пригодиться - создание процедур в Postgres, используя Python. Думаю, примера будет достаточно, чтобы понять, как это делается (конечно официальной документации никто не отменял):


    CREATE OR REPLACE FUNCTION outer_procedure(p_limit INT) RETURNS VOID AS $$
        # Добавление пути для импорта нашего модуля
        import sys
        sys.path.insert(0, '/home/developer/plpython_func/')

        # Загрузка модуля
        import test_functions

        # На время разработки, пока модуль будет меняться,
        # его нужно насильно перезагружать, т.к. он кешируется
        reload(test_functions)

        # Собственно, запуск функции из модуля.
        # plpy - это объект для взаимодействия с БД
        test_functions.run(plpy, p_limit)

        return None
    $$ LANGUAGE plpythonu VOLATILE STRICT;

Внешняя процедура на Python (test_functions.py), которую мы загружаем:


    # -*- coding: utf-8 -*-

    import traceback

    def run(plpy, limit):

        # Готовим план выполнения
        sql = 'INSERT INTO new_table(id, text) VALUES($1, $2)'
        plan = plpy.prepare(sql, ["int", "text"])

        # Выполняем вставку внутри транзакции, ловим ошибки
        try:
            with plpy.subtransaction():
                plpy.execute(plan, [1, "text"])
        except plpy.SPIError as e:
            plpy.error("Error inside transaction: %s" % e.args)
        except:
            plpy.error(traceback.format_exc(10))