Taegis SDK for Pythonにおけるスレッド処理🔗
Taegis SDK for Pythonは、組み込みのスレッド処理をサポートしています。異なるAPIコール構成で作業する主な方法は、with service(): マネージャを用いたサービスコンテキストマネージャを使用することです。これらはスレッド化された関数内で使用する必要がありますが、すべての実行中のスレッドに適用される可能性のある変更は、スレッドが送信されるコードブロックの外側で適用することもできます。
スレッドの外側でサービスオブジェクトを構築することで、SDKは特定のサービスオブジェクトやGraphQLスキーマをスレッド間で作成・共有できます。これにより、複雑なやり取りにおいて大幅な時間短縮が可能となります。
from taegis_sdk_python import GraphQLService
import concurrent.futures
service = GraphQLService(environment="charlie")
def threaded_function(service: GraphQLService, tenant_id: str):
results = []
try:
# this will be constrained to just the the thread
# this will apply to all API calls within the context manager
with service(tenant_id=tenant_id):
# these can be ANY API calls that you want to thread
results.append(service.subjects.query.current_subject())
# the context manager can remain layered where you may want to request
# fields specific to an API call
with service(output="field fields { field field }")
results.append(service.clients.query.clients())
results.append(service.tenants.query.assignable_services())
except Exception as exc:
print(exc)
return []
return results
tenants = get_tenants() # defined elsewhere
future_results = {}
# The GraphQLService optionally supports shared configuration between threads.
# The below example supplies a user-managed token at the service level,
# which will be applied to all threads within the service,
# rather than requiring the user to supply the same token to each function.
with service(access_token=my_managed_token): # Optional
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(threaded_function, service, tenant.id): tenant.id
for tenant in tenants
}
print("Completing futures...")
for future in concurrent.futures.as_completed(futures):
# stitch results per tenant
tenant = futures[future]
future_results[tenant] = future.result()
例🔗
この例は新しいパフォーマンス指標を測定するために設計されていますが、スレッド処理の主なユースケース、すなわち複数のクエリを大量のテナントリストに対して実行する場合のセットアップ方法を示しています。
from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.tenants.types import TenantsQuery, Tenants
import concurrent.futures
import threading
from collections import Counter
service = GraphQLService()
production_environments = ["production", "delta", "echo", "foxtrot"]
def get_tenants(service: GraphQLService) -> List[Tenants]:
max_results = 1000
page_number = 1
result = service.tenants.query.tenants(
TenantsQuery(
max_results=max_results,
page_num=page_number,
)
)
results = [result]
while result.has_more:
page_number += 1
print(f"Polling page: {page_number}")
result = service.tenants.query.tenants(
TenantsQuery(
max_results=max_results,
page_num=page_number,
)
)
results.append(result)
return [
tenant
for result in results
for tenant in result.results
]
def create_test_tenants_list(tenants: List[Tenants]) -> List[Tenants]:
def single_production_enabled(environments):
return len([
True
for environment in environments
if environment.enabled and environment.name in production_environments
]) == 1
test_tenants = []
counter = Counter()
for tenant in tenants:
# finding tenants enabled in only a single environment so we don't have to verify which is the primary
if single_production_enabled(tenant.environments):
environment = [
environment.name
for environment in tenant.environments
if environment.enabled == True and environment.name in production_environments
][0]
# limit each environment to 100 tenants for consistency testing between environments
if counter[environment] < 100:
counter.update([environment])
test_tenants.append(tenant)
return test_tenants
def threaded_function(service: GraphQLService, tenant):
results = []
environment = [
environment.name
for environment in tenant.environments
if environment.enabled == True and environment.name in production_environments
][0]
try:
with service(tenant_id=tenant.id, environment=environment):
# these can be ANY API calls that you want to thread
results.append(service.subjects.query.current_subject())
results.append(service.clients.query.clients())
results.append(service.tenants.query.assignable_services())
except Exception as exc:
print(exc)
return []
return results
tenants = get_tenants(service)
test_tenants = create_test_tenants_list(tenants)
future_results = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(threaded_function, service, tenant): tenant.id
for tenant in test_tenants
}
print("Completing futures...")
for future in concurrent.futures.as_completed(futures):
# stitch results per tenant
tenant = futures[future]
future_results[tenant] = future.result()
# do something with future_results