본문 바로가기
Python

Python - Microsoft 365 Defender Advanced hunting Query

by 올엠 2024. 3. 18.
반응형

Microsoft 365 Defender Advanced hunting Query 에서 사용할 수 있는 Python 코드이다.

mde_search 함수를 재활용해서, 입력한 값이 어떤 타입인지에 따라 검색하는 조건을 바꾸도록 설계되어 있다.

import json
import time
import argparse
import pandas as pd
import logging
import sys
import os
import re
import validators
import datetime
import random
import zipfile
import urllib.parse
from collections import OrderedDict
from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
 
with open('./conf.json', 'r') as f:
    conf = json.load(f)
 
MDE_TENANT = conf['tenant']
MDE_APP = conf['app']
MDE_SECRET = conf['secret']
MDE_URL = "https://login.microsoftonline.com/%s/oauth2/token" % (MDE_TENANT)
 
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG)
 
def requests_retry_session(
    retries=10,
    backoff_factor=0.3,
    status_forcelist=(500, 502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
 
def http_request(address, hr_jsondata, hr_headers = None):
    trycount = 0
    number = random.randint(1,3)
 
    while True:
        try:
            if trycount == 10:
                logging.warning('httpRequest retry 10')
                break
            else:
                trycount += 1
            time.sleep(number)
            if hr_jsondata:
                res = requests_retry_session().post(url=address, data=hr_jsondata, headers=hr_headers, verify=False)
            else:
                res = requests_retry_session().get(url=address, headers=hr_headers, verify=False)
            if res.status_code == 200:
                break
            else:
                print(res.text)
        except Exception as httpRequest_e:
            httpRequest_error = str(httpRequest_e)
            logging.warning('httpRequest: %s', httpRequest_error)
            time.sleep(1)
    return res
 
 
def mde_search(search_q, search_item):
    try:
        mde_search_result = ''
        mde_file_path = ''
        resourceAppIdUri = 'https://api.securitycenter.microsoft.com'
        body = {
            'resource' : resourceAppIdUri,
            'client_id' : MDE_APP,
            'client_secret' : MDE_SECRET,
            'grant_type' : 'client_credentials'
        }
        data = urllib.parse.urlencode(body).encode("utf-8")
        req = http_request(MDE_URL, data).json()
        aadToken = req["access_token"]
 
        url = "https://api.securitycenter.microsoft.com/api/advancedqueries/run"
        headers = {
            'Content-Type' : 'application/json',
            'Accept' : 'application/json',
            'Authorization' : "Bearer " + aadToken
        }
        data = json.dumps({ 'Query' : search_q }).encode("utf-8")
        req = http_request(url, data, headers).json()
        if len(req['Results']) >= 1:
            df_result = pd.DataFrame(req['Results'])
            df_string = df_result.to_string()
            if len(df_result.index) > 30:
                df_string = '('+ str(len(df_result.index))+'/10000)'
                task_date = datetime.datetime.now()
                directory = './'+str(task_date.month)+'/'+str(search_item)+'/'
                if not os.path.exists(os.path.dirname(directory)):
                    os.makedirs(os.path.dirname(directory))
                mde_search_filename = search_item+'.xlsx'
                mde_file_path = os.path.join(directory, mde_search_filename)
                df_result.to_excel(mde_file_path)
            mde_search_result = df_string
    except Exception as main_mde_e:
        main_mde_error = str(main_mde_e)
        mde_search_result = main_mde_error
        logging.warning('main_mde_error: %s', main_mde_error)
        time.sleep(1)
    return mde_search_result, mde_file_path
 
if __name__ == "__main__":
    """
    메인 내장 함수
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('-q', '--query', help="search query", required=True)
    args, unknown = parser.parse_known_args()
    hashlist = []
    ip_regex = r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"
    md5_regex = r"\b(?!^[\d]*$)(?!^[a-fA-F]*$)([a-f\d]{32}|[A-F\d]{32})\b"
    sha256_regex = r"[A-Fa-f0-9]{64}"
    domain_regex = r"^((?!-))(xn--)?[a-z0-9][a-z0-9-_]{0,61}[a-z0-9]{0,1}\.(xn--)?([a-z0-9\-]{1,61}|[a-z0-9-]{1,30}\.[a-z]{2,})$"
    account_regex = r"^(?![-._])(?!.*[_.-]{2})[\w.-]{6,30}(?<![-._])$"
    # Search 를 regex 별로 만들어서 여러개의 타입도 인식할 수 있도록 함.
    # Search시 여러 값을 동시에 인식할 수 있도록 구성 <- 추후 구성
    query = args.query
    if ' ' in query:
        splitlist = query.split(' ')
        for s in splitlist:
            s = s.strip()
            hashlist.append(s)
    else:
        hashdata = query.strip()
        hashlist.append(hashdata)
    for search_item in hashlist:
        try:
            search_item = search_item.lower()
            taskdetect = False
            main_result = ''
            search_event = ''
            search_query = ''
            search_result_file = ''
            if bool(re.match(ip_regex, search_item)):
                # 네트워크, 프로세스, 레지스트리
                search_event = '`Network, Process, Registry Events`\n- Search RemoteIP, LocalIP'
                search_query = 'find in (DeviceNetworkEvents, DeviceProcessEvents, DeviceRegistryEvents) where Timestamp > ago(30d) and (RemoteIP contains "{0}" or LocalIP contains "{0}") | summarize by DeviceName, LocalIP, RemoteIP, InitiatingProcessFileName | project-rename device=DeviceName, local=LocalIP, remoteip=RemoteIP, processname=InitiatingProcessFileName| take 50000'.format(search_item) # Paste your own query here
            elif bool(re.match(md5_regex, search_item)):
                # 파일 해쉬, 프로세스 관련
                search_event = '`File, Process, ImageLoad Events`\n- Search MD5, InitiatingProcessMD5'
                search_query = 'find in (DeviceFileEvents, DeviceProcessEvents, DeviceImageLoadEvents) where Timestamp > ago(30d) and (MD5 == "{0}" or InitiatingProcessMD5 == "{0}") | summarize by DeviceName, InitiatingProcessFileName | project-rename device=DeviceName, processname=InitiatingProcessFileName| take 10000'.format(search_item)
            elif bool(re.match(sha256_regex, search_item)):
                # 네트워크, 프로세스, 레지스트리, 파일 해쉬, 프로세스 관련
                search_event = '`File, Process, Device, Registry, Network, ImageLoad Events`\n- Search SHA256, InitiatingProcessSHA256'
                search_query = 'find in (DeviceFileEvents, DeviceProcessEvents, DeviceEvents, DeviceRegistryEvents, DeviceNetworkEvents, DeviceImageLoadEvents) where Timestamp > ago(30d) and (SHA256 == "{0}" or InitiatingProcessSHA256 == "{0}") | summarize by DeviceName, InitiatingProcessFileName | project-rename device=DeviceName, processname=InitiatingProcessFileName| take 50000'.format(search_item)
            elif bool(validators.url(search_item)):
                # 네트워크 RemoteUrl, FolderPath
                search_event = '`Network, File Events`\n- Search RemoteUrl, FolderPath'
                search_query = 'find in (DeviceNetworkEvents, DeviceFileEvents) where Timestamp > ago(30d) and (RemoteUrl contains "{0}" or FolderPath contains "{0}") | summarize by DeviceName, RemoteUrl, FolderPath, InitiatingProcessFileName | project-rename device=DeviceName, remoteurl=RemoteUrl, folder=FolderPath, processname=InitiatingProcessFileName| take 50000'.format(search_item)
            elif bool(re.match(domain_regex, search_item)):
                #RemoteUrl, FolderPath
                search_event = '`Network, File Events`\n- Search RemoteUrl, FolderPath'
                search_query = 'find in (DeviceNetworkEvents, DeviceFileEvents) where Timestamp > ago(30d) and (RemoteUrl contains "{0}" or FolderPath contains "{0}") | summarize by DeviceName, RemoteUrl, FolderPath, InitiatingProcessFileName | project-rename device=DeviceName, remoteurl=RemoteUrl, folder=FolderPath, processname=InitiatingProcessFileName| take 50000'.format(search_item)
            elif search_item.startswith('cve-'):
                # CVE 확인
                search_event = '`SoftwareVulnerabilities`\n- Search CveId'
                search_query = 'find in (DeviceTvmSoftwareVulnerabilities) where (CveId contains "{}") | take 50000'.format(search_item)
            else:
                # 소프트웨어 검색
                search_event = '`DeviceTvmSoftwareInventory`\n- Search SoftwareName'
                search_query = 'find in (DeviceTvmSoftwareInventory) where (SoftwareName contains "{}") | project DeviceId, DeviceName, OSPlatform, SoftwareVendor, SoftwareName, SoftwareVersion | summarize count() by SoftwareName, SoftwareVersion | take 50000'.format(search_item)
            if search_query:
                main_result, search_result_file = mde_search(search_query, search_item, search_event)
            else:
                search_event = 'not match ioc type'
            if main_result:
                taskdetect = True
            print(main_result)
        except Exception as e:
            pass

환경 설정 파일인 conf.json은 다음과 같이 작업하면 된다.

{
    "tenant":"input here",
    "app":"input here",
    "secret":"input here"
}
반응형