Source code for ECAgent.Decode

import json
import sys

from ECAgent.Core import Model


[docs]class IDecodable: """This class serves as the base class for Decode submodel. By inheriting from this class your Components, Systems and Agents can be decoded from a text file and built into an executable model In order to implement this class, the decode() static function must be overwritten.""" @staticmethod def decode(params: dict): raise NotImplementedError("Call to IDecodable.decode() not allowed.")
[docs]class Decoder: """Base decoder class: ECAgent decoders follow a specific decoding process. The process, as executed by the decoder, is as follows: * pre_model_decode(func, params) * model_init(params) * for each system in 'systems': * pre_system_init(func, system, params) * system_init(system, params) * post_system_init(func, system, params) * for each agent in 'agents' * pre_agent_init(func, agent, params) * agent_init(agent, params) * post_agent_init(func, agent, params) * post_model_decode(func, params) All pre and post methods invoke the method specified to func and supply the method with 'params' as an argument. If you want to create your own decoder, override the open_file function such that it creates a dictionary with the following content: { "model" : { "name" : Model_Name, "module(Optional)" : Module_Name, "params" : { params_1 : value_1, ..., params_n : value_n } }, "systems": [ system_1, ..., system_n, ], "agents" : [ agent_type_1, ..., agent_type_n ], pre_model_decode(Optional): { "func" : FunctionName, "module(Optional)" : ModuleName, "params" : { params_1 : value_1, ..., params_n : value_n } }, post_model_decode(Optional): { "func" : FunctionName, "module(Optional)" : ModuleName, "params" : { params_1 : value_1, ..., params_n : value_n } } } where both 'system' and 'agent_type' follow the same structure as 'model' """ @staticmethod def str_to_class(class_name: str, module_name: str): return getattr(sys.modules[module_name], class_name, None) @staticmethod def str_to_func(func_name: str, module_name: str): return getattr(sys.modules[module_name], func_name, None) @staticmethod def get_module_name(d: dict, false_return: str = '__main__') -> str: return d['module'] if 'module' in d else false_return
[docs] def open_file(self, file_name: str) -> dict: """You must overwrite this function if you make your own decoder""" raise NotImplementedError('You cannot invoke the open_file() method of the base decoder class')
def decode(self, file_path: str) -> Model: # Get decode dictionary data = self.open_file(file_path) if data is None: raise Exception('Unable to open file %s for decoding' % file_path) # Invoke pre_model_decode if 'pre_model_decode' in data: func = Decoder.str_to_func(data['pre_model_decode']['func'], Decoder.get_module_name(data['pre_model_decode'])) func(data['pre_model_decode']['params']) # Create Base Model generatedModel = Decoder.str_to_class(data['model']['name'], Decoder.get_module_name(data['model'])).decode( data['model']['params'] ) for systemDict in data['systems']: # Invoke pre_system_init if 'pre_system_init' in systemDict: func = Decoder.str_to_func(systemDict['pre_system_init']['func'], Decoder.get_module_name(systemDict['pre_system_init'])) systemDict['pre_system_init']['params']['model'] = generatedModel func(systemDict['pre_system_init']['params']) # Add generated model to the systemDict systemDict['params']['model'] = generatedModel # Create system generatedModel.systems.add_system( Decoder.str_to_class(systemDict['name'], Decoder.get_module_name(systemDict)).decode( systemDict['params'] ) ) # Invoke post_system_init if 'post_system_init' in systemDict: func = Decoder.str_to_func(systemDict['post_system_init']['func'], Decoder.get_module_name(systemDict['post_system_init'])) systemDict['post_system_init']['params']['model'] = generatedModel func(systemDict['post_system_init']['params']) for agentDict in data['agents']: # Invoke pre_agent_init if 'pre_agent_init' in agentDict: func = Decoder.str_to_func(agentDict['pre_agent_init']['func'], Decoder.get_module_name(agentDict['pre_agent_init'])) agentDict['pre_agent_init']['params']['model'] = generatedModel func(agentDict['pre_agent_init']['params']) # Add reference to the model in systemDict so that it can be used when creating the agents agentDict['params']['model'] = generatedModel # Create agents for i in range(0, agentDict['number']): # Add the index of the agent to the agentDict agentDict['params']['agent_index'] = i generatedModel.environment.add_agent( Decoder.str_to_class(agentDict['name'], Decoder.get_module_name(agentDict)).decode( agentDict['params'] ) ) if 'post_agent_init' in agentDict: func = Decoder.str_to_func(agentDict['post_agent_init']['func'], Decoder.get_module_name(agentDict['post_agent_init'])) agentDict['post_agent_init']['params']['model'] = generatedModel func(agentDict['post_agent_init']['params']) # Invoke post_model_decode if 'post_model_decode' in data: func = Decoder.str_to_func(data['post_model_decode']['func'], Decoder.get_module_name(data['post_model_decode'])) func(data['post_model_decode']['params']) return generatedModel
[docs]class JsonDecoder(Decoder):
[docs] def open_file(self, file_name: str) -> dict: with open(file_name) as json_file: return json.load(json_file)