SDice技術メモ

技術備忘録や仕事関連でやったこと

Pythonでの並列処理

Pythonを使用して並列処理を記載しました。
書いている時に、スレッドが動作するタイミングがfor文が最後まで動作後に実行されてしまったり、Exceptionが出力されないなど気づくことが多くありましたので、備忘録として残そうと思いました。
中々細かい点が思い道理にサクッと書けないことに、自分の力不足を感じます。
なんとか実力をつける為、とにかく毎日書きまくるしかないのかなと。。。

サンプルとして下記のソースを記載しました。
IDが記載されたファイルからIDリストを取得し、ID毎に並列処理でファイル検索を行うサンプルです。

# coding:utf-8
import time
import glob
import os
import traceback
import concurrent.futures

from functools import partial


def print_time(func):
    """
    実行時間計測用デコレータ
    """
    def wrapper(*args, **kwargs):
        t1 = time.time()
        print("START")
        func(*args, **kwargs)
        print("END")
        t2 = time.time()
        print("経過時間{} 's ".format(t1 - t2))

    return wrapper


def find_file_path(data_dir, id):
    """
    ディレクトリ配下のIDファイルを検索
        Args:
            data_dir (str):検索ディレクトリ
            id (str):検索ファイルID
        Returns:
            file_dict(dict) 拡張子ごとにパスを格納した辞書
    """
    # 検索使用するパス
    find_path = os.path.join(data_dir, "**", "*" + id + "*")
    # フォルダ内を再起的な検索
    files = glob.glob(find_path, recursive=True)

    # 検索後にdictに格納する拡張子
    extensions = ['xml', 'txt', 'py']

    # 辞書作成
    file_dict = {}
    for file in files:
        for extension in extensions:
            if file.endswith(os.extsep + extension):
                file_dict[extension] = file

    return file_dict


def create_thread(data_dir, id_list):
    """
    スレッド作成ジェネレータ
    Args:
        data_dir (str):検索対象のディレクトリ
    Returns:
        thread:ID単位のスレッド
    """
    for id in id_list:
        def _thread(id):
            """
            スレッド実行(id単位)
                Args:
                    id (id):id
            """
            try:
                files = find_file_path(data_dir, id)
                print(files)
            except Exception:
                traceback.print_exc()

        yield partial(_thread, id)


def get_id_list(file_path):
    """
    IDリスト作成
        Arags:
            file_path (str):ファイルのパス
        Returns:
            id_list(list):idリスト
    """
    id_list = []
    with open(file_path, 'r') as f:
        for line in f:
            # 改行コード削除してリストへ追加
            id_list.append(line.rstrip('\n'))
    return id_list


@print_time
def main():
    """
    メイン処理
    """
    # idリストファイルよりidリストを取得
    id_list = get_id_list('./id_file.txt')

    # スレッドジェネレータ作成
    threads = create_thread('./data_dir', id_list)

    # スレッドプールからスレッド実行
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    for thread in threads:
        executor.submit(thread)


if __name__ == "__main__":
    main()