Skip to content

Make Queries with the Taegis SDK for Python๐Ÿ”—

The Taegis SDK for Python is set up to query detections, events, and cases out of the box. Following are a few examples; for a more extensive list, see Taegis SDK for Python Examples.

Query Detections๐Ÿ”—

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput

service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
    cql_query="FROM detection WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
    limit=10000,
    offset=0,
))

Detections Pagination๐Ÿ”—

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput, PollRequestInput, AlertsResponse

service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
    cql_query="FROM detection WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
    limit=1000000,
    offset=0,
))

poll_responses = [results]
search_id = results.search_id
total_parts = results.alerts.total_parts

if search_id:
    for part in range(2, total_parts + 1):
        results = None
        try:
            results = service.alerts.query.alerts_service_poll(
                PollRequestInput(
                    search_id=search_id,
                    part_id=part,
                )
            )
        except Exception as exc:
            if "not found" in str(exc):
                break
            raise exc

        if (
            isinstance(results, AlertsResponse)
            and results.alerts is not None
        ):
            poll_responses.append(results)

print(sum(
    len(response.alerts.list)
    for response in poll_responses
))

Query Events๐Ÿ”—

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryOptions

service = GraphQLService()
options = EventQueryOptions(
    timestamp_ascending=True,
    page_size=1000,
    max_rows=1000,
    skip_cache=True,
    aggregation_off=False,
)

results = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)

Events Pagination๐Ÿ”—

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryResults, EventQueryOptions
from typing import List, Optional

def get_next_page(events_results: List[EventQueryResults]) -> Optional[str]:
    """Retrieve events  next page indicator."""
    try:
        # the next page could be found in any of the result pages,
        # but we cannot guarantee which result it will be found in
        return next(
            iter({result.next for result in events_results if result.next is not None})
        )
    except StopIteration:
        return None

service = GraphQLService()
options = EventQueryOptions(
    timestamp_ascending=True,
    page_size=1000,
    max_rows=100000,
    skip_cache=True,
    aggregation_off=False,
)
results = []

result = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)
results.extend(result)
next_page = get_next_page(result)

while next_page:
    result = service.events.subscription.event_page(next_page)
    results.extend(result)
    next_page = get_next_page(result)

Query Cases๐Ÿ”—

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.investigations2.types import InvestigationsV2Arguments

page = 1
per_page = 30
cql = "WHERE deleted_at IS NULL EARLIEST=-30d | sort updated_at asc"

service = GraphQLService()

investigation_output = service.investigations2.query.investigations_v2(
    InvestigationsV2Arguments(
        page=page,
        per_page=per_page,
        cql=cql,
    )
)
investigation_output

Cases Pagination๐Ÿ”—

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.investigations2.types import InvestigationsV2Arguments

page = 1
per_page = 30
cql = "WHERE deleted_at IS NULL EARLIEST=-30d | sort updated_at asc"

results = []

service = GraphQLService()

investigation_output = service.investigations2.query.investigations_v2(
    InvestigationsV2Arguments(
        page=page,
        per_page=per_page,
        cql=cql,
    )
)
results.append(investigation_output)

total_count = investigation_output.total_count

while (
    sum_total := sum(len(result.investigations) for result in results)
) < total_count:
    page += 1
    investigation_output = service.investigations2.query.investigations_v2(
        InvestigationsV2Arguments(
            page=page,
            per_page=per_page,
            cql=cql,
        )
    )
    results.append(investigation_output)

investigations = [
    investigation for result in results for investigation in result.investigations
]