Показаны сообщения с ярлыком postgresql. Показать все сообщения
Показаны сообщения с ярлыком postgresql. Показать все сообщения

2016-01-19

Postgres: generate the most large followers intersections

Задача: получить список пользователей твиттера, с которыми у одного из них есть общие фоловеры, и отсортировать по их количеству. Лучше всего продемонстрировать на примере. Создадим таблицу с идентификаторами пользователей и фоловеров:


CREATE TEMP TABLE f(uid INT, fid INT);
INSERT INTO f(uid, fid) VALUES
 (1, 10), (1, 11), (1, 12), (1, 13), (1, 14),
 (2, 10), (2, 11), (2, 12),
 (3, 13), (3, 14),
 (4, 13), (4, 14), (4, 10), (4, 11)
;

А вот собственно и запрос, интересуют пересечения с пользователем 2, самые большие сверху.


SELECT uid, array_agg(fid) followers_intersect
FROM f GROUP BY uid
ORDER BY cardinality(
 (
  SELECT array_agg(a1) FROM unnest(array_agg(fid)) a1
  INNER JOIN unnest(
   (SELECT array_agg(fid) FROM f WHERE uid=2)::INT[]
  ) a2
  ON a1=a2
 )::INT[]
) DESC NULLS LAST;

Результат, как и ожидалось:

  • 1,"{10,11,12,13,14}"
  • 2,"{10,11,12}"
  • 4,"{13,14,10,11}"
  • 3,"{13,14}"

2015-07-17

Быстрая переброска данных в удаленный Postgres

Передо мной стояла задача быстрой вставки большого количества JSON в Postgres на удаленном сервере. Обычное соединение к СУБД и использование "INSERT" нужной скорости не давало, даже с "executemany". Решением стал комплекс действий, состоящий из следующих этапов.

Первое - это компактификация пересылаемых данных (в моем случае это JSON). Для этого я использовал модуль dict_tools, который заменяет ключи в словаре с данными на уникальные и максимально короткие, возвращая словарь с заменами, который потом можно использовать для возвращения первоначальных имен.


from dict_tools import names_generator, names_replace
aliases = names_generator()
tweets = [{'text': 'Hello, world'}, {'text': 'Bye, chaos'}]
shorten, replaced = names_replace(tweets, aliases=aliases)

Кроме того, превращая словарь в JSON-строку использовал опции "separators", "encoding" и "ensure_ascii" - меньше пробелов и нормальные символы в UTF-8 вместо escape-последовательностей типа \u01010 для юникода:


def dumps(d):
    return json.dumps(d, separators=(',',':'), encoding='utf-8', ensure_ascii=False)

Второе - это отправка сжатых данных на сервер, где расположен Postgres. Для этого с одной стороны работает скрипт, использующий модуль requests и отправляющий данные. В словаре с замененными именами ключи и значения меняются местами, чтобы произвести обратную замену на принимающей стороне. Здесь "localhost:8888" просто для примера.


import requests
data = {
    'snames': dumps({v: k for k, v in replaced.items()}),
    'tweets': dumps(shorten)
}
r = requests.post('http://localhost:8888', data=data)
print(r.text)

На другой стороне висит минисервис на Tornado, принимающий присылаемые данные, возвращающий оригинальные ключи в JSON, размещающий данные построчно в CSV-файле и вызывающий команду Postgres "COPY".


import tornado.ioloop
import tornado.web
import json
import csv
import os
import psycopg2
from dict_tools import names_replace

class MainHandler(tornado.web.RequestHandler):
    def post(self, *args, **kwargs):

        tweets = json.loads(self.get_argument('tweets'))
        snames = json.loads(self.get_argument('snames'))
        tweets, replaced_names =  names_replace(tweets, repl=snames)

        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dump.csv')

        fieldnames = ['id', 'src']
        with open(path, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            for tweet in tweets:
                writer.writerow({
                    'id': tweet['id'],
                    'src': json.dumps(tweet, separators=(',',':'), encoding='utf-8', ensure_ascii=False).encode('utf-8')
                })

        con = psycopg2.connect(host='localhost', port=5432, user='XXX', password='XXX', database='XXX')
        cur = con.cursor()
        cur.execute("COPY dump_table(id, src) FROM '{}' WITH CSV HEADER".format(path))
        con.commit()
        con.close()

        self.write(u'Saved {} items'.format(len(tweets)))

if __name__ == "__main__":
    application = tornado.web.Application([(r"/", MainHandler)])
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Такая схема у меня работает в несколько раз быстрее, чем вставка с удаленного сервера.

2015-07-15

Postgres + Python = Awesome

Чем больше работаю с Postgres, тем больше радует меня эта СУБД. Шикарная работа с JSON, позволяющая хранить структуры разнотипных данных в полях JSON и JSONB, и даже вешать на них индексы для быстрого поиска внутри них - находка для архивирования твитов и им подобных. Отличный набор функций для обработки текста, включая regexp. Типы-массивы (тоже с индексами). Переменных-таблиц нет, но их с успехом заменяют переменные-массивы или временные таблицы. Вот некоторые выражения, приводящие меня в щенячий восторг:


    -- Новая таблица по образцу выборки
    CREATE TABLE new_table AS SELECT field1, field2 FROM old_table LIMIT 10;

    -- Временная самоликвидирующаяся таблица
    CREATE TEMP TABLE IF NOT EXISTS new_table(id INT, data JSON) ON COMMIT DROP;

    -- Одно и то-же, во втором случае используется массив
    SELECT * FROM old_table WHERE id IN (1,2,3);
    SELECT * FROM old_table WHERE id = ANY('{1,2,3}');

    -- Создание массива в процессе группировки
    SELECT array_agg(text_id) FROM old_table WHERE id IN (1,2,3);

    -- JSON-массив JSON-объектов
    SELECT json_agg(t.jobj) FROM (
        SELECT json_build_object('text_id', id, 'record_id', _id) jobj
        FROM old_table
    ) t;

    -- Cоотношение твитов с вложенными чувствами и черствых:
    -- на выходе что-то вроде "{ "false" : 9115, "true" : 166 }"
    SELECT json_object_agg(t.ps, t.c) FROM (
        SELECT src#>>'{possibly_sensitive}' ps, COUNT(_id) c
        FROM tweet_archive
        WHERE src#>>'{possibly_sensitive}' = ANY('{false,true}') -- а тут массив с текстами
        GROUP BY src#>>'{possibly_sensitive}'
    ) t;

Для работы с Postgres используется Python и стандартная библиотека json. Для компактности и корректной работы с не-ASCII символами внутри JSON-полей Postgres при упаковке твитов используются такие параметры:

    json.dumps(tweet, separators=(',',':'), encoding='utf-8', ensure_ascii=False)

Еще одна возможность, которая может пригодиться - создание процедур в Postgres, используя Python. Думаю, примера будет достаточно, чтобы понять, как это делается (конечно официальной документации никто не отменял):


    CREATE OR REPLACE FUNCTION outer_procedure(p_limit INT) RETURNS VOID AS $$
        # Добавление пути для импорта нашего модуля
        import sys
        sys.path.insert(0, '/home/developer/plpython_func/')

        # Загрузка модуля
        import test_functions

        # На время разработки, пока модуль будет меняться,
        # его нужно насильно перезагружать, т.к. он кешируется
        reload(test_functions)

        # Собственно, запуск функции из модуля.
        # plpy - это объект для взаимодействия с БД
        test_functions.run(plpy, p_limit)

        return None
    $$ LANGUAGE plpythonu VOLATILE STRICT;

Внешняя процедура на Python (test_functions.py), которую мы загружаем:


    # -*- coding: utf-8 -*-

    import traceback

    def run(plpy, limit):

        # Готовим план выполнения
        sql = 'INSERT INTO new_table(id, text) VALUES($1, $2)'
        plan = plpy.prepare(sql, ["int", "text"])

        # Выполняем вставку внутри транзакции, ловим ошибки
        try:
            with plpy.subtransaction():
                plpy.execute(plan, [1, "text"])
        except plpy.SPIError as e:
            plpy.error("Error inside transaction: %s" % e.args)
        except:
            plpy.error(traceback.format_exc(10))

2015-02-26

PostgreSQL: как получить несколько выборок (роусетов)

Переводя проект с использования AzureSQL (MSSQL) на PostgreSQL я столкнулся с тем, что последний не умеет возвращать результаты нескольких запросов SELECT (так называемые роусеты, т.е. наборы строк). К примеру, некая процедура генерирует SELECT из нескольких таблиц, чтобы отобразить отчет по этой сборной солянке. Так вот, в отличии от MSSQL, PostgreSQL возвращает только последий из них, такая уж особенность. Странно, но факт.

Что делать? Мне в голову приходили три варианта.

Один из них - создать в процедуре временные таблицы с гарантированно уникальными именами, сложить наборы строк в них и вернуть их список. На клиенте соответственно понадобится сделать запросы к этим таблицам и при необходимости их уничтожить. Минусов у такого подхода полно, взять хотя бы увеличение количества запросов к БД и возможные коллизии в случае забывания удаления временных таблиц (хотя это решаемо).

Второй способ - генерировать бинарные строки специфического формата (к примеру, Паскалевскую строку) из всех выборок и вернуть один SELECT с этими бинарниками. Минус - сложность при создании и распаковке на клиенте, хотя и позволяет получить сразу все данные одним запросом, к тому же очень компактно.

И наконец, третий вариант, который работает в PostgreSQL благодаря поддержке типа данных JSON. Думаю, простой демонстрации кода будет достаточно, чтобы понять, как это работает:

SELECT json_agg(json_build_array(r.id, r.title))
FROM (SELECT id, title FROM queries LIMIT 3) r

Результат:

[
    [1, "Query 1"],
    [2, "Query 2"],
    [3, "Query 3"],
]

Запрос:

SELECT json_build_array(
    (SELECT json_agg(json_build_array(r.id, r.title))
    FROM (SELECT id, title FROM queries LIMIT 2) r),
    (SELECT json_agg(json_build_array(r.id, r.name))
    FROM (SELECT id, name FROM companies LIMIT 2 OFFSET 2) r)
);

Результат:

[
    [
        [1, "Query 1"],
        [2, "Query 2"]
    ],
    [
        [3, "Company 1"],
        [4, "Company 2"]
    ]
]