#!/usr/bin/env python # Impacket - Collection of Python classes for working with network protocols. # # SECUREAUTH LABS. Copyright (C) 2021 SecureAuth Corporation. All rights reserved. # # This software is provided under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # Description: # DCE/RPC lookup sid brute forcer example # # Author: # Alberto Solino (@agsolino) # # Reference for: # DCE/RPC [MS-LSAT] # from __future__ import division from __future__ import print_function import sys import logging import argparse import codecs from impacket.examples import logger from impacket.examples.utils import parse_target from impacket import version from impacket.dcerpc.v5 import transport, lsat, lsad from impacket.dcerpc.v5.samr import SID_NAME_USE from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED from impacket.dcerpc.v5.rpcrt import DCERPCException class LSALookupSid: KNOWN_PROTOCOLS = { 139: {'bindstr': r'ncacn_np:%s[\pipe\lsarpc]', 'set_host': True}, 445: {'bindstr': r'ncacn_np:%s[\pipe\lsarpc]', 'set_host': True}, } def __init__(self, username='', password='', domain='', port = None, hashes = None, domain_sids = False, maxRid=4000): self.__username = username self.__password = password self.__port = port self.__maxRid = int(maxRid) self.__domain = domain self.__lmhash = '' self.__nthash = '' self.__domain_sids = domain_sids if hashes is not None: self.__lmhash, self.__nthash = hashes.split(':') def dump(self, remoteName, remoteHost): logging.info('Brute forcing SIDs at %s' % remoteName) stringbinding = self.KNOWN_PROTOCOLS[self.__port]['bindstr'] % remoteName logging.info('StringBinding %s'%stringbinding) rpctransport = transport.DCERPCTransportFactory(stringbinding) rpctransport.set_dport(self.__port) if self.KNOWN_PROTOCOLS[self.__port]['set_host']: rpctransport.setRemoteHost(remoteHost) if hasattr(rpctransport, 'set_credentials'): # This method exists only for selected protocol sequences. rpctransport.set_credentials(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) try: self.__bruteForce(rpctransport, self.__maxRid) except Exception as e: if logging.getLogger().level == logging.DEBUG: import traceback traceback.print_exc() logging.critical(str(e)) raise def __bruteForce(self, rpctransport, maxRid): dce = rpctransport.get_dce_rpc() entries = [] dce.connect() # Want encryption? Uncomment next line # But make SIMULTANEOUS variable <= 100 #dce.set_auth_level(ntlm.NTLM_AUTH_PKT_PRIVACY) # Want fragmentation? Uncomment next line #dce.set_max_fragment_size(32) dce.bind(lsat.MSRPC_UUID_LSAT) resp = lsad.hLsarOpenPolicy2(dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES) policyHandle = resp['PolicyHandle'] if self.__domain_sids: # get the Domain SID resp = lsad.hLsarQueryInformationPolicy2(dce, policyHandle, lsad.POLICY_INFORMATION_CLASS.PolicyPrimaryDomainInformation) domainSid = resp['PolicyInformation']['PolicyPrimaryDomainInfo']['Sid'].formatCanonical() else: # Get the target host SID resp = lsad.hLsarQueryInformationPolicy2(dce, policyHandle, lsad.POLICY_INFORMATION_CLASS.PolicyAccountDomainInformation) domainSid = resp['PolicyInformation']['PolicyAccountDomainInfo']['DomainSid'].formatCanonical() logging.info('Domain SID is: %s' % domainSid) soFar = 0 SIMULTANEOUS = 1000 for j in range(maxRid//SIMULTANEOUS+1): if (maxRid - soFar) // SIMULTANEOUS == 0: sidsToCheck = (maxRid - soFar) % SIMULTANEOUS else: sidsToCheck = SIMULTANEOUS if sidsToCheck == 0: break sids = list() for i in range(soFar, soFar+sidsToCheck): sids.append(domainSid + '-%d' % i) try: lsat.hLsarLookupSids(dce, policyHandle, sids,lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta) except DCERPCException as e: if str(e).find('STATUS_NONE_MAPPED') >= 0: soFar += SIMULTANEOUS continue elif str(e).find('STATUS_SOME_NOT_MAPPED') >= 0: resp = e.get_packet() else: raise for n, item in enumerate(resp['TranslatedNames']['Names']): if item['Use'] != SID_NAME_USE.SidTypeUnknown: print("%d: %s\\%s (%s)" % ( soFar + n, resp['ReferencedDomains']['Domains'][item['DomainIndex']]['Name'], item['Name'], SID_NAME_USE.enumItems(item['Use']).name)) soFar += SIMULTANEOUS dce.disconnect() return entries # Process command-line arguments. if __name__ == '__main__': # Explicitly changing the stdout encoding format if sys.stdout.encoding is None: # Output is redirected to a file sys.stdout = codecs.getwriter('utf8')(sys.stdout) print(version.BANNER) parser = argparse.ArgumentParser() parser.add_argument('target', action='store', help='[[domain/]username[:password]@]') parser.add_argument('maxRid', action='store', default = '4000', nargs='?', help='max Rid to check (default 4000)') parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') group = parser.add_argument_group('connection') group.add_argument('-target-ip', action='store', metavar="ip address", help='IP Address of the target machine. ' 'If omitted it will use whatever was specified as target. This is useful when target is the ' 'NetBIOS name and you cannot resolve it') group.add_argument('-port', choices=['139', '445'], nargs='?', default='445', metavar="destination port", help='Destination port to connect to SMB Server') group.add_argument('-domain-sids', action='store_true', help='Enumerate Domain SIDs (will likely forward requests to the DC)') group = parser.add_argument_group('authentication') group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful when proxying through smbrelayx)') if len(sys.argv)==1: parser.print_help() sys.exit(1) options = parser.parse_args() # Init the example's logger theme logger.init(options.ts) domain, username, password, remoteName = parse_target(options.target) if domain is None: domain = '' if password == '' and username != '' and options.hashes is None and options.no_pass is False: from getpass import getpass password = getpass("Password:") if options.target_ip is None: options.target_ip = remoteName lookup = LSALookupSid(username, password, domain, int(options.port), options.hashes, options.domain_sids, options.maxRid) try: lookup.dump(remoteName, options.target_ip) except: pass