# Copyright (c) 2015, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.
import base64
import bz2
import zlib
import cybox
import cybox.bindings.artifact_object as artifact_binding
from cybox.common import ObjectProperties, String
[docs]class RawArtifact(String):
_binding_class = artifact_binding.RawArtifactType
_namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2'
byte_order = cybox.TypedField("byte_order")
[docs]class Artifact(ObjectProperties):
# Warning: Do not attempt to get or set Raw_Artifact directly. Use `data`
# or `packed_data` respectively. Raw_Artifact will be set on export.
_binding = artifact_binding
_namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2'
_XSI_NS = "ArtifactObj"
_XSI_TYPE = "ArtifactObjectType"
TYPE_FILE = "File"
TYPE_MEMORY = "Memory Region"
TYPE_FILE_SYSTEM = "File System Fragment"
TYPE_NETWORK = "Network Traffic"
TYPE_GENERIC = "Generic Data Region"
def __init__(self, data=None, type_=None):
super(Artifact, self).__init__()
self.type_ = type_
self.packaging = []
self._packed_data = None
self.data = data
@property
def data(self):
if self._data:
return self._data
elif self._packed_data:
tmp_data = self._packed_data
for p in reversed(self.packaging):
tmp_data = p.unpack(tmp_data)
return tmp_data
else:
return None
@data.setter
def data(self, value):
if self._packed_data:
raise ValueError("packed_data already set, can't set data")
self._data = value
@property
def packed_data(self):
if self._packed_data:
return self._packed_data
elif self._data:
tmp_data = self._data
for p in self.packaging:
tmp_data = p.pack(tmp_data)
return tmp_data
else:
return None
@packed_data.setter
def packed_data(self, value):
if self._data:
raise ValueError("data already set, can't set packed_data")
self._packed_data = value
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
artifact_obj = artifact_binding.ArtifactObjectType()
super(Artifact, self).to_obj(return_obj=artifact_obj, ns_info=ns_info)
if self.packaging:
packaging = artifact_binding.PackagingType()
for p in self.packaging:
p_obj = p.to_obj(ns_info=ns_info)
if isinstance(p, Compression):
packaging.add_Compression(p_obj)
elif isinstance(p, Encryption):
packaging.add_Encryption(p_obj)
elif isinstance(p, Encoding):
packaging.add_Encoding(p_obj)
else:
raise ValueError("Unsupported Packaging Type: %s" %
type(p))
artifact_obj.Packaging = packaging
if self.packed_data:
artifact_obj.Raw_Artifact = RawArtifact(self.packed_data).to_obj(ns_info=ns_info)
artifact_obj.type_ = self.type_
return artifact_obj
[docs] def to_dict(self):
artifact_dict = {}
super(Artifact, self).to_dict(artifact_dict)
if self.packaging:
artifact_dict['packaging'] = [p.to_dict() for p in self.packaging]
if self.packed_data:
artifact_dict['raw_artifact'] = RawArtifact(self.packed_data).to_dict()
if self.type_:
artifact_dict['type'] = self.type_
return artifact_dict
@staticmethod
[docs] def from_obj(artifact_obj):
if not artifact_obj:
return None
artifact = Artifact()
ObjectProperties.from_obj(artifact_obj, artifact)
packaging = artifact_obj.Packaging
if packaging:
for c in packaging.Compression:
artifact.packaging.append(Compression.from_obj(c))
for e in packaging.Encryption:
artifact.packaging.append(Encryption.from_obj(e))
for e in packaging.Encoding:
artifact.packaging.append(Encoding.from_obj(e))
raw_artifact = artifact_obj.Raw_Artifact
if raw_artifact:
artifact.packed_data = RawArtifact.from_obj(raw_artifact).value
artifact.type_ = artifact_obj.type_
return artifact
@staticmethod
[docs] def from_dict(artifact_dict):
if not artifact_dict:
return None
artifact = Artifact()
ObjectProperties.from_dict(artifact_dict, artifact)
for layer in artifact_dict.get('packaging', []):
if layer.get('packaging_type') == "compression":
artifact.packaging.append(Compression.from_dict(layer))
if layer.get('packaging_type') == "encryption":
artifact.packaging.append(Encryption.from_dict(layer))
if layer.get('packaging_type') == "encoding":
artifact.packaging.append(Encoding.from_dict(layer))
raw_artifact = artifact_dict.get('raw_artifact')
if raw_artifact:
artifact.packed_data = RawArtifact.from_dict(raw_artifact).value
artifact.type_ = artifact_dict.get('type')
return artifact
[docs]class Packaging(cybox.Entity):
"""An individual packaging layer."""
_namespace = 'http://cybox.mitre.org/objects#ArtifactObject-2'
[docs] def pack(self, data):
raise NotImplementedError()
[docs] def unpack(self, packed_data):
raise NotImplementedError()
[docs]class Compression(Packaging):
"""A Compression packaging layer
Currently only zlib and bz2 are supported.
Also, compression_mechanism_ref is not currently supported.
"""
def __init__(self, compression_mechanism=None):
super(Compression, self).__init__()
self.compression_mechanism = compression_mechanism
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
obj = artifact_binding.CompressionType()
if self.compression_mechanism:
obj.compression_mechanism = self.compression_mechanism
return obj
[docs] def to_dict(self):
dict_ = {}
dict_['packaging_type'] = 'compression'
if self.compression_mechanism:
dict_['compression_mechanism'] = self.compression_mechanism
return dict_
@staticmethod
[docs] def from_obj(compression_obj):
mechanism = compression_obj.compression_mechanism
return Compression.get_object(mechanism)
@staticmethod
[docs] def from_dict(compression_dict):
mechanism = compression_dict.get('compression_mechanism')
return Compression.get_object(mechanism)
@staticmethod
[docs] def get_object(mechanism):
if mechanism == 'zlib':
return ZlibCompression()
elif mechanism == "bz2":
return Bz2Compression()
else:
raise ValueError("Unsupported compression mechanism: %s" % mechanism)
[docs]class ZlibCompression(Compression):
def __init__(self):
super(ZlibCompression, self).__init__("zlib")
[docs] def pack(self, data):
return zlib.compress(data)
[docs] def unpack(self, packed_data):
return zlib.decompress(packed_data)
[docs]class Bz2Compression(Compression):
def __init__(self):
super(Bz2Compression, self).__init__("bz2")
[docs] def pack(self, data):
return bz2.compress(data)
[docs] def unpack(self, packed_data):
return bz2.decompress(packed_data)
[docs]class Encryption(Packaging):
"""
An encryption packaging layer.
"""
def __init__(self, encryption_mechanism=None, encryption_key=None):
super(Encryption, self).__init__()
self.encryption_mechanism = encryption_mechanism
self.encryption_key = encryption_key
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
obj = artifact_binding.EncryptionType()
if self.encryption_mechanism:
obj.encryption_mechanism = self.encryption_mechanism
if self.encryption_key:
obj.encryption_key = self.encryption_key
return obj
[docs] def to_dict(self):
dict_ = {}
dict_['packaging_type'] = 'encryption'
if self.encryption_mechanism:
dict_['encryption_mechanism'] = self.encryption_mechanism
if self.encryption_key:
dict_['encryption_key'] = self.encryption_key
return dict_
@staticmethod
[docs] def from_obj(encryption_obj):
mechanism = encryption_obj.encryption_mechanism
key = encryption_obj.encryption_key
return Encryption.get_object(mechanism, key)
@staticmethod
[docs] def from_dict(encryption_dict):
mechanism = encryption_dict.get('encryption_mechanism')
key = encryption_dict.get('encryption_key')
return Encryption.get_object(mechanism, key)
@staticmethod
[docs] def get_object(mechanism, key):
if mechanism == 'xor':
return XOREncryption(key)
if mechanism == 'PasswordProtected':
return PasswordProtectedZipEncryption(key)
else:
raise ValueError("Unsupported encryption mechanism: %s" % mechanism)
[docs]def xor(data, key):
key = int(key)
return ''.join([chr(ord(c) ^ key) for c in data])
[docs]class XOREncryption(Encryption):
def __init__(self, key):
super(XOREncryption, self).__init__("xor", key)
[docs] def pack(self, data):
return xor(data, self.encryption_key)
[docs] def unpack(self, packed_data):
return xor(packed_data, self.encryption_key)
[docs]class PasswordProtectedZipEncryption(Encryption):
def __init__(self, key):
super(PasswordProtectedZipEncryption, self).__init__("PasswordProtected", key)
# `pack` is not implemented
[docs] def unpack(self, packed_data):
from zipfile import ZipFile
from StringIO import StringIO
buf = StringIO(packed_data)
with ZipFile(buf, 'r') as myzip:
# Assume there is only one member in the archive, and that it
# contains the artifact data. Ignore the name.
filename = myzip.namelist()[0]
data = myzip.read(filename, self.encryption_key)
return data
[docs]class Encoding(Packaging):
"""
An encoding packaging layer.
Currently only base64 with a standard alphabet is supported.
"""
[docs] def to_obj(self, return_obj=None, ns_info=None):
self._collect_ns_info(ns_info)
# Defaults to "Base64" algorithm
obj = artifact_binding.EncodingType()
return obj
[docs] def to_dict(self):
dict_ = {}
dict_['packaging_type'] = 'encoding'
dict_['algorithm'] = 'Base64'
return dict_
@staticmethod
[docs] def from_obj(encoding_obj):
if encoding_obj:
return Base64Encoding()
@staticmethod
[docs] def from_dict(encoding_dict):
if encoding_dict:
return Base64Encoding()
[docs]class Base64Encoding(Encoding):
[docs] def pack(self, data):
return base64.b64encode(data)
[docs] def unpack(self, packed_data):
return base64.b64decode(packed_data)