kubo39's blog

ただの雑記です。

mysql-nativeでbulk insertをする

最近はMySQLを触っている中で、なにかしらメモを残しておくかという気分になった。

mysql-nativeにはbulk insert用のサポートがあるわけではないが、メタプログラミングコンパイル時にクエリをある程度生成できたりすると便利だなあと思うなどした。

  • 環境
$ dmd --version
DMD64 D Compiler v2.094.0
$ cat dub.selections.json | grep mysql-native
                "mysql-native": "3.0.0",
  • コード (修正版)
import mysql.commands;
import mysql.connection;
import mysql.pool;
import std.algorithm : map;
import std.array :array;
import std.format : format;
import std.range : iota, repeat;
import std.stdio : writeln;
import std.string : join;

// vibe-coreのコネクションプールを使う
version = Have_vibe_core;

__gshared MySQLPool pool;

shared static this()
{
    string connStr = "host=localhost;port=3307;user=testuser;pwd=testpassword;db=testdb";
    // コネクションプールを生成
    pool = new MySQLPool(connStr);

    // デフォルトの最大並列接続数はuint.max
    // ここでは10に設定しておく
    pool.maxConcurrency(10);
}

auto bulkInsert(S, string table, S[] objects)(Connection conn)
{
    enum stmt = "INSERT INTO %s (%s) VALUES ".format(table, [__traits(allMembers, S)].join(","));
    enum rows = "(%s)".format("?".repeat(S.tupleof.length).join(",")).repeat(objects.length).join(",");
    mixin(`return conn.exec("` ~ stmt ~ rows ~ `",` ~
          objects.length.iota.map!(i => "objects[%d].tupleof".format(i)).join(",") ~
          ");");
}

void main()
{
    // プールからひとつコネクションをとってくる
    auto conn = pool.lockConnection();
    scope(exit) conn.close();

    // 一時テーブルを作成
    conn.exec("CREATE TEMPORARY TABLE `payment` (
        `customer_id` int not null,
        `amount` int not null,
        `account_name` text
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4");

    struct Payment
    {
        int customer_id;
        int amount;
        string account_name;
    }
    const payments = [
        Payment(1, 2, null),
        Payment(3, 4, "foo"),
        Payment(5, 6, null)];

    const affectedRows = conn.bulkInsert!(Payment, "payment", payments);

    enum select = "SELECT %s FROM payment".format([__traits(allMembers, Payment)].join(","));
    const result = conn.query(select)
        .map!(row => Payment(row[0].get!int, row[1].get!int, row[2].get!string))
        .array;

    writeln(result);
}

テストするのに最近はdockerを使っている。これは便利。 このくらいのクエリなら意味はないけど、bulk insertするときは max_allowed_packet を大きめに指定しておく。

$ docker run --rm -d -p 127.0.0.1:3307:3306 --name=mariadb10 -e MYSQL_DATABASE=testdb -e MYSQL_USER=testuser -e MYSQL_PASSWORD=testpassword -e MYSQL_ALLOW_EMPTY_PASSWORD=yes mariadb:10.1 --max_allowed_packet=16MB
  • 実行結果
$ dub run -q
[const(Payment)(1, 2, ""), const(Payment)(3, 4, "foo"), const(Payment)(5, 6, "")]