コンテンツにスキップ

Taegis SDK for Python での拡張とカスタマイズ🔗

GraphQL が提供できる利点の一つは、フェデレーションサービスです。これにより、単一の GraphQL コールで他のサービスからの結果を関連付けることができます。Taegis SDK for Python にはこの機能は標準では備わっていませんが、簡単に拡張することが可能です。

この例では、アラートデータと並べてイベントデータを追加したいと考えています。これは別のサービスからデータを追加するため、元の型からいくつかのデータクラスを拡張する必要があります。

スキーマを調査すると、event_data は Map フィールド(または Python の辞書)ですが、デフォルトでは AlertsResponse の出力文字列に含まれていません。SDK は、基底オブジェクトから継承し、フィールドをデータクラスに追加することで、このクラスをサポートするように拡張できます。

from taegis_sdk_python import GraphQLService, build_output_string
from taegis_sdk_python.services.alerts.types import AlertsResponse

service = GraphQLService()
schema = service.core.get_sync_schema()
print(schema.query_type.fields["alertsServiceSearch"])
print(schema.type_map["AlertsResponse"].fields)
print(schema.type_map["AlertsList"].fields)
print(schema.type_map["Alert2"].fields)
print(schema.type_map['AuxiliaryEvent'].fields)
# alertsServiceSearch
<GraphQLField <GraphQLObjectType 'AlertsResponse'>>
# AlertsResponse
{'status': <GraphQLField <GraphQLEnumType 'RPCResponseStatus'>>,
 'reason': <GraphQLField <GraphQLScalarType 'String'>>,
 'alerts': <GraphQLField <GraphQLObjectType 'AlertsList'>>,
 'search_id': <GraphQLField <GraphQLScalarType 'String'>>}
# AlertsList
{'list': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Alert2'>>>>,
 'total_results': <GraphQLField <GraphQLScalarType 'Int'>>,
 'next_offset': <GraphQLField <GraphQLScalarType 'Int'>>,
 'previous_offset': <GraphQLField <GraphQLScalarType 'Int'>>,
 'last_offset': <GraphQLField <GraphQLScalarType 'Int'>>,
 'first_offset': <GraphQLField <GraphQLScalarType 'Int'>>,
 'total_parts': <GraphQLField <GraphQLScalarType 'Int'>>,
 'part': <GraphQLField <GraphQLScalarType 'Int'>>,
 'group_by': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'AggregationResponse'>>>>}
# Alert2
{'id': <GraphQLField <GraphQLNonNull <GraphQLScalarType 'ID'>>>,
 'group_key': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLScalarType 'String'>>>>,
 'metadata': <GraphQLField <GraphQLObjectType 'AlertsMetadata'>>,
 'visibility': <GraphQLField <GraphQLEnumType 'Visibility'>>,
 'attack_technique_ids': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLScalarType 'String'>>>>,
 'tenant_id': <GraphQLField <GraphQLScalarType 'String'>>,
 'parent_tenant_id': <GraphQLField <GraphQLScalarType 'String'>>,
 'suppressed': <GraphQLField <GraphQLScalarType 'Boolean'>>,
 'suppression_rules': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'AlertRuleReference'>>>>,
 'alerting_rules': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'AlertRuleReference'>>>>,
 'status': <GraphQLField <GraphQLEnumType 'ResolutionStatus'>>,
 'resolution_reason': <GraphQLField <GraphQLScalarType 'String'>>,
 'resolution_history': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'ResolutionMetadata'>>>>,
 'severity_history': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'SeverityUpdate'>>>>,
 'tags': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLScalarType 'String'>>>>,
 'sensor_types': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLScalarType 'String'>>>>,
 'entities': <GraphQLField <GraphQLObjectType 'EntityRelationships'>>,
 'key_entities': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'EntityMetadata'>>>>,
 'event_ids': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'AuxiliaryEvent'>>>>,
 'observation_ids': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Observation'>>>>,
 'investigation_ids': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Investigation'>>>>,
 'collection_ids': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Collection'>>>>,
 'enrichment_details': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'EnrichmentDetail'>>>>,
 'third_party_details': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'ThirdPartyDetail'>>>>,
 'reference_details': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'ReferenceDetail'>>>>,
 'priority': <GraphQLField <GraphQLObjectType 'AlertPriority'>>,
 'events_metadata': <GraphQLField <GraphQLObjectType 'AlertEventMetadata'>>}
# AuxiliaryEvent
{'id': <GraphQLField <GraphQLNonNull <GraphQLScalarType 'ID'>>>,
 'event_data': <GraphQLField <GraphQLScalarType 'Map'>>,
 'observation_ids': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Observation'>>>>,
 'investigations_resource_id': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Investigation'>>>>,
 'alerts_resource_id': <GraphQLField <GraphQLList <GraphQLNonNull <GraphQLObjectType 'Alerts2'>>>>}
build_output_string(AlertsResponse)
...
event_ids { id }
...

注: 出力は冗長性のため省略されています。

このカスタムデータクラスを利用するすべてのオブジェクトツリーも更新する必要がありますが、変更されたフィールドのみを参照すれば十分です。この例のツリーは次の通りです: AlertsResponse > AlertsList > Alert2 > AuxiliaryEvent

from taegis_sdk_python import GraphQLService, prepare_input, build_output_string
from taegis_sdk_python.services.alerts.types import *
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json, config

@dataclass_json
@dataclass(order=True, eq=True, frozen=True)
class CustomAuxiliaryEvent(AuxiliaryEvent):
    """My Custom Auxiliary Event - Extends Auxiliary Event with event_data
    to take advantage of GQL federated services.
    """

    event_data: Optional[Dict[str, Any]] = field(
        default=None, metadata=config(field_name="event_data")
    )


@dataclass_json
@dataclass(order=True, eq=True, frozen=True)
class CustomAlert2(Alert2):
    """My Custom Alert2."""

    event_ids: Optional[List[CustomAuxiliaryEvent]] = field(
        default=None, metadata=config(field_name="event_ids")
    )



@dataclass_json
@dataclass(order=True, eq=True, frozen=True)
class CustomAlertsList(AlertsList):
    """My Custom AlertsList."""

    list: Optional[List[CustomAlert2]] = field(
        default=None, metadata=config(field_name="list")
    )


@dataclass_json
@dataclass(order=True, eq=True, frozen=True)
class CustomAlertsResponse(AlertsResponse):
    """My Custom AlertsResponse."""

    alerts: Optional[CustomAlertsList] = field(
        default=None, metadata=config(field_name="alerts")
    )


def alerts_service_search_with_events(service: GraphQLService, in_: SearchRequestInput) -> CustomAlertsResponse:
    """Query Taegis Alerts with corresponding Events attached."""
    endpoint = "alertsServiceSearch"
    result = service.alerts.execute_query(
        endpoint=endpoint,
        variables={
            "in": prepare_input(in_),
        },
        output=build_output_string(CustomAlertsResponse),
    )
    if result is not None:
        return CustomAlertsResponse.from_dict(result.get(endpoint))
    raise GraphQLNoRowsInResultSetError("for query alertsServiceSearch")