コンテンツにスキップ

Taegis SDK for Pythonにおけるスレッド処理🔗

Taegis SDK for Pythonは、組み込みのスレッド処理をサポートしています。異なるAPIコール構成で作業する主な方法は、with service(): マネージャを用いたサービスコンテキストマネージャを使用することです。これらはスレッド化された関数内で使用する必要がありますが、すべての実行中のスレッドに適用される可能性のある変更は、スレッドが送信されるコードブロックの外側で適用することもできます。

スレッドの外側でサービスオブジェクトを構築することで、SDKは特定のサービスオブジェクトやGraphQLスキーマをスレッド間で作成・共有できます。これにより、複雑なやり取りにおいて大幅な時間短縮が可能となります。

from taegis_sdk_python import GraphQLService
import concurrent.futures

service = GraphQLService(environment="charlie")

def threaded_function(service: GraphQLService, tenant_id: str):
    results = []

    try:
        # this will be constrained to just the the thread
        # this will apply to all API calls within the context manager
        with service(tenant_id=tenant_id):
            # these can be ANY API calls that you want to thread
            results.append(service.subjects.query.current_subject())

            # the context manager can remain layered where you may want to request
            # fields specific to an API call
            with service(output="field fields { field field }")
                results.append(service.clients.query.clients())

            results.append(service.tenants.query.assignable_services())
    except Exception as exc:
        print(exc)
        return []

    return results

tenants = get_tenants() # defined elsewhere
future_results = {}
# The GraphQLService optionally supports shared configuration between threads.
# The below example supplies a user-managed token at the service level,
# which will be applied to all threads within the service,
# rather than requiring the user to supply the same token to each function.
with service(access_token=my_managed_token): # Optional
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {
            executor.submit(threaded_function, service, tenant.id): tenant.id
            for tenant in tenants
        }

        print("Completing futures...")
        for future in concurrent.futures.as_completed(futures):
            # stitch results per tenant
            tenant = futures[future]
            future_results[tenant] = future.result()

🔗

この例は新しいパフォーマンス指標を測定するために設計されていますが、スレッド処理の主なユースケース、すなわち複数のクエリを大量のテナントリストに対して実行する場合のセットアップ方法を示しています。

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.tenants.types import TenantsQuery, Tenants

import concurrent.futures
import threading
from collections import Counter

service = GraphQLService()
production_environments = ["production", "delta", "echo", "foxtrot"]


def get_tenants(service: GraphQLService) -> List[Tenants]:
    max_results = 1000
    page_number = 1

    result = service.tenants.query.tenants(
        TenantsQuery(
            max_results=max_results,
            page_num=page_number,
        )
    )

    results = [result]

    while result.has_more:
        page_number += 1
        print(f"Polling page: {page_number}")

        result = service.tenants.query.tenants(
            TenantsQuery(
                max_results=max_results,
                page_num=page_number,
            )
        )
        results.append(result)

    return [
        tenant
        for result in results
        for tenant in result.results
    ]

def create_test_tenants_list(tenants: List[Tenants]) -> List[Tenants]:
    def single_production_enabled(environments):
        return len([
            True 
            for environment in environments 
            if environment.enabled and environment.name in production_environments
        ]) == 1

    test_tenants = []
    counter = Counter()
    for tenant in tenants:
        # finding tenants enabled in only a single environment so we don't have to verify which is the primary
        if single_production_enabled(tenant.environments):
            environment = [
                environment.name
                for environment in tenant.environments
                if environment.enabled == True and environment.name in production_environments
            ][0]

            # limit each environment to 100 tenants for consistency testing between environments
            if counter[environment] < 100:
                counter.update([environment])
                test_tenants.append(tenant)

    return test_tenants

def threaded_function(service: GraphQLService, tenant):
    results = []

    environment = [
        environment.name
        for environment in tenant.environments
        if environment.enabled == True and environment.name in production_environments
    ][0]

    try:
        with service(tenant_id=tenant.id, environment=environment):
            # these can be ANY API calls that you want to thread
            results.append(service.subjects.query.current_subject())
            results.append(service.clients.query.clients())
            results.append(service.tenants.query.assignable_services())
    except Exception as exc:
        print(exc)
        return []

    return results

tenants = get_tenants(service)
test_tenants = create_test_tenants_list(tenants)

future_results = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {
        executor.submit(threaded_function, service, tenant): tenant.id
        for tenant in test_tenants
    }

    print("Completing futures...")
    for future in concurrent.futures.as_completed(futures):
        # stitch results per tenant
        tenant = futures[future]
        future_results[tenant] = future.result()

# do something with future_results