# Copyright (c) 2003-2016 CORE Security Technologies # # This software is provided under under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # Author: Alberto Solino (beto@coresecurity.com) # # Description: # SPNEGO functions used by SMB, SMB2/3 and DCERPC # from struct import pack, unpack, calcsize ############### GSS Stuff ################ GSS_API_SPNEGO_UUID = '\x2b\x06\x01\x05\x05\x02' ASN1_SEQUENCE = 0x30 ASN1_AID = 0x60 ASN1_OID = 0x06 ASN1_OCTET_STRING = 0x04 ASN1_MECH_TYPE = 0xa0 ASN1_MECH_TOKEN = 0xa2 ASN1_SUPPORTED_MECH = 0xa1 ASN1_RESPONSE_TOKEN = 0xa2 ASN1_ENUMERATED = 0x0a MechTypes = { '+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30', '+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider', '*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5', '*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5', '*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User' } TypesMech = dict((v,k) for k, v in MechTypes.iteritems()) def asn1encode(data = ''): #res = asn1.SEQUENCE(str).encode() #import binascii #print '\nalex asn1encode str: %s\n' % binascii.hexlify(str) if 0 <= len(data) <= 0x7F: res = pack('B', len(data)) + data elif 0x80 <= len(data) <= 0xFF: res = pack('BB', 0x81, len(data)) + data elif 0x100 <= len(data) <= 0xFFFF: res = pack('!BH', 0x82, len(data)) + data elif 0x10000 <= len(data) <= 0xffffff: res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data elif 0x1000000 <= len(data) <= 0xffffffff: res = pack('!BL', 0x84, len(data)) + data else: raise Exception('Error in asn1encode') return str(res) def asn1decode(data = ''): len1 = unpack('B', data[:1])[0] data = data[1:] if len1 == 0x81: pad = calcsize('B') len2 = unpack('B',data[:pad])[0] data = data[pad:] ans = data[:len2] elif len1 == 0x82: pad = calcsize('H') len2 = unpack('!H', data[:pad])[0] data = data[pad:] ans = data[:len2] elif len1 == 0x83: pad = calcsize('B') + calcsize('!H') len2, len3 = unpack('!BH', data[:pad]) data = data[pad:] ans = data[:len2 << 16 + len3] elif len1 == 0x84: pad = calcsize('!L') len2 = unpack('!L', data[:pad])[0] data = data[pad:] ans = data[:len2] # 1 byte length, string <= 0x7F else: pad = 0 ans = data[:len1] return ans, len(ans)+pad+1 class GSSAPI: # Generic GSSAPI Header Format def __init__(self, data = None): self.fields = {} self['UUID'] = GSS_API_SPNEGO_UUID if data: self.fromString(data) pass def __setitem__(self,key,value): self.fields[key] = value def __getitem__(self, key): return self.fields[key] def __delitem__(self, key): del self.fields[key] def __len__(self): return len(self.getData()) def __str__(self): return len(self.getData()) def fromString(self, data = None): # Manual parse of the GSSAPI Header Format # It should be something like # AID = 0x60 TAG, BER Length # OID = 0x06 TAG # GSSAPI OID # UUID data (BER Encoded) # Payload next_byte = unpack('B',data[:1])[0] if next_byte != ASN1_AID: raise Exception('Unknown AID=%x' % next_byte) data = data[1:] decode_data, total_bytes = asn1decode(data) # Now we should have a OID tag next_byte = unpack('B',decode_data[:1])[0] if next_byte != ASN1_OID: raise Exception('OID tag not found %x' % next_byte) decode_data = decode_data[1:] # Now the OID contents, should be SPNEGO UUID uuid, total_bytes = asn1decode(decode_data) self['OID'] = uuid # the rest should be the data self['Payload'] = decode_data[total_bytes:] #pass def dump(self): for i in self.fields.keys(): print "%s: {%r}" % (i,self[i]) def getData(self): ans = pack('B',ASN1_AID) ans += asn1encode( pack('B',ASN1_OID) + asn1encode(self['UUID']) + self['Payload'] ) return ans class SPNEGO_NegTokenResp: # http://tools.ietf.org/html/rfc4178#page-9 # NegTokenResp ::= SEQUENCE { # negState [0] ENUMERATED { # accept-completed (0), # accept-incomplete (1), # reject (2), # request-mic (3) # } OPTIONAL, # -- REQUIRED in the first reply from the target # supportedMech [1] MechType OPTIONAL, # -- present only in the first reply from the target # responseToken [2] OCTET STRING OPTIONAL, # mechListMIC [3] OCTET STRING OPTIONAL, # ... # } # This structure is not prepended by a GSS generic header! SPNEGO_NEG_TOKEN_RESP = 0xa1 SPNEGO_NEG_TOKEN_TARG = 0xa0 def __init__(self, data = None): self.fields = {} if data: self.fromString(data) pass def __setitem__(self,key,value): self.fields[key] = value def __getitem__(self, key): return self.fields[key] def __delitem__(self, key): del self.fields[key] def __len__(self): return len(self.getData()) def __str__(self): return len(self.getData()) def fromString(self, data = 0): payload = data next_byte = unpack('B', payload[:1])[0] if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP: raise Exception('NegTokenResp not found %x' % next_byte) payload = payload[1:] decode_data, total_bytes = asn1decode(payload) next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_SEQUENCE: raise Exception('SEQUENCE tag not found %x' % next_byte) decode_data = decode_data[1:] decode_data, total_bytes = asn1decode(decode_data) next_byte = unpack('B',decode_data[:1])[0] if next_byte != ASN1_MECH_TYPE: # MechType not found, could be an AUTH answer if next_byte != ASN1_RESPONSE_TOKEN: raise Exception('MechType/ResponseToken tag not found %x' % next_byte) else: decode_data2 = decode_data[1:] decode_data2, total_bytes = asn1decode(decode_data2) next_byte = unpack('B', decode_data2[:1])[0] if next_byte != ASN1_ENUMERATED: raise Exception('Enumerated tag not found %x' % next_byte) item, total_bytes2 = asn1decode(decode_data) self['NegResult'] = item decode_data = decode_data[1:] decode_data = decode_data[total_bytes:] # Do we have more data? if len(decode_data) == 0: return next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_SUPPORTED_MECH: if next_byte != ASN1_RESPONSE_TOKEN: raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte) else: decode_data2 = decode_data[1:] decode_data2, total_bytes = asn1decode(decode_data2) next_byte = unpack('B', decode_data2[:1])[0] if next_byte != ASN1_OID: raise Exception('OID tag not found %x' % next_byte) decode_data2 = decode_data2[1:] item, total_bytes2 = asn1decode(decode_data2) self['SupportedMech'] = item decode_data = decode_data[1:] decode_data = decode_data[total_bytes:] next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_RESPONSE_TOKEN: raise Exception('Response token tag not found %x' % next_byte) decode_data = decode_data[1:] decode_data, total_bytes = asn1decode(decode_data) next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_OCTET_STRING: raise Exception('Octet string token tag not found %x' % next_byte) decode_data = decode_data[1:] decode_data, total_bytes = asn1decode(decode_data) self['ResponseToken'] = decode_data def dump(self): for i in self.fields.keys(): print "%s: {%r}" % (i,self[i]) def getData(self): ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP) if self.fields.has_key('NegResult') and self.fields.has_key('SupportedMech'): # Server resp ans += asn1encode( pack('B', ASN1_SEQUENCE) + asn1encode( pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) + asn1encode( pack('B',ASN1_ENUMERATED) + asn1encode( self['NegResult'] )) + pack('B',ASN1_SUPPORTED_MECH) + asn1encode( pack('B',ASN1_OID) + asn1encode(self['SupportedMech'])) + pack('B',ASN1_RESPONSE_TOKEN ) + asn1encode( pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken'])))) elif self.fields.has_key('NegResult'): # Server resp ans += asn1encode( pack('B', ASN1_SEQUENCE) + asn1encode( pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) + asn1encode( pack('B',ASN1_ENUMERATED) + asn1encode( self['NegResult'] )))) else: # Client resp ans += asn1encode( pack('B', ASN1_SEQUENCE) + asn1encode( pack('B', ASN1_RESPONSE_TOKEN) + asn1encode( pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken'])))) return ans class SPNEGO_NegTokenInit(GSSAPI): # http://tools.ietf.org/html/rfc4178#page-8 # NegTokeInit :: = SEQUENCE { # mechTypes [0] MechTypeList, # reqFlags [1] ContextFlags OPTIONAL, # mechToken [2] OCTET STRING OPTIONAL, # mechListMIC [3] OCTET STRING OPTIONAL, # } SPNEGO_NEG_TOKEN_INIT = 0xa0 def fromString(self, data = 0): GSSAPI.fromString(self, data) payload = self['Payload'] next_byte = unpack('B', payload[:1])[0] if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT: raise Exception('NegTokenInit not found %x' % next_byte) payload = payload[1:] decode_data, total_bytes = asn1decode(payload) # Now we should have a SEQUENCE Tag next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_SEQUENCE: raise Exception('SEQUENCE tag not found %x' % next_byte) decode_data = decode_data[1:] decode_data, total_bytes2 = asn1decode(decode_data) next_byte = unpack('B',decode_data[:1])[0] if next_byte != ASN1_MECH_TYPE: raise Exception('MechType tag not found %x' % next_byte) decode_data = decode_data[1:] remaining_data = decode_data decode_data, total_bytes3 = asn1decode(decode_data) next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_SEQUENCE: raise Exception('SEQUENCE tag not found %x' % next_byte) decode_data = decode_data[1:] decode_data, total_bytes4 = asn1decode(decode_data) # And finally we should have the MechTypes self['MechTypes'] = [] while decode_data: next_byte = unpack('B', decode_data[:1])[0] if next_byte != ASN1_OID: # Not a valid OID, there must be something else we won't unpack break decode_data = decode_data[1:] item, total_bytes = asn1decode(decode_data) self['MechTypes'].append(item) decode_data = decode_data[total_bytes:] # Do we have MechTokens as well? decode_data = remaining_data[total_bytes3:] if len(decode_data) > 0: next_byte = unpack('B', decode_data[:1])[0] if next_byte == ASN1_MECH_TOKEN: # We have tokens in here! decode_data = decode_data[1:] decode_data, total_bytes = asn1decode(decode_data) next_byte = unpack('B', decode_data[:1])[0] if next_byte == ASN1_OCTET_STRING: decode_data = decode_data[1:] decode_data, total_bytes = asn1decode(decode_data) self['MechToken'] = decode_data def getData(self): mechTypes = '' for i in self['MechTypes']: mechTypes += pack('B', ASN1_OID) mechTypes += asn1encode(i) mechToken = '' # Do we have tokens to send? if self.fields.has_key('MechToken'): mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode( pack('B', ASN1_OCTET_STRING) + asn1encode( self['MechToken'])) ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT) ans += asn1encode( pack('B', ASN1_SEQUENCE) + asn1encode( pack('B', ASN1_MECH_TYPE) + asn1encode( pack('B', ASN1_SEQUENCE) + asn1encode(mechTypes)) + mechToken )) self['Payload'] = ans return GSSAPI.getData(self)