@qq-framework/task-manager
v1.0.0
Published
Projeto Gerenciador de Tarefas das Lojas Quero-Quero
Downloads
4
Keywords
Readme
Como reprocessar tarefas de forma automática
O processo de reprocessamento é uma funcionalidade que permite a execução de tarefas de forma controlada, com tentativas repetidas em caso de falha. Ele é útil para tarefas que podem falhar temporariamente devido a problemas de rede, serviços externos indisponíveis, entre outros.
Pré-requisitos
Para utilizar o reprocessamento de tarefas, será necessário importar a model Reprocessamento
nas entities do seu arquivo ormconfig.ts
.
const entities = ['dist/src/modules/**/infra/models/*.model{.ts,.js}', ReprocessamentoModel]
export default {
type: 'postgres',
url: process.env.DATABASE_URL,
entities,
synchronize: false,
logging: false,
migrations: ['dist/src/shared/infra/typeorm/migrations/*.js'],
cli: {
migrationsDir: 'src/shared/infra/typeorm/migrations',
},
}
Como usar
Atualmente há 2 formas de reprocessar tarefas de forma automática, são elas:
Utilizando o decorator
@RetryableEventPattern
e o interceptor@RetryableEventPatternDlqInterceptor
, ambos utilizam o Kafka para reprocessar as mensagens.Utilizando a classe
ControleProcessamento
e a classeControleReprocessamento
, que são classes que permitem reprocessar tarefas de forma automática, sem a necessidade de utilizar o Kafka (mas que também podem implementar envio de eventos).
1. Utilizando o decorator @RetryableEventPattern
e o interceptor @RetryableEventPatternDlqInterceptor
Para utilizar o decorator @RetryableEventPattern
e o interceptor @RetryableEventPatternDlqInterceptor
, você precisa seguir 2 principais etapas, são elas:
Etapa 1: Criar o Processamento
Inserir o decorator @RetryableEventPattern
no método que deseja reprocessar, informando os parâmetros necessários.
@RetryableEventPattern(
param1,
param2,
param3,
param4
)
O decorator RetryableEventPattern
recebe quatro argumentos:
Param1
-> topico: O nome do tópico do Kafka que será enviado a mensagem em caso de falha.Param2
-> quantidadeCiclos: O número máximo de ciclos de reprocessamento. O valor padrão é 1.Param3
-> quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo. O valor padrão é 3.Param4
-> processo: O nome do processo à ser executado.
Exemplo de uso
@RetryableEventPattern(
KafkaTopics.IMAGEM_PRODUTO_IMPORTADA_FALHA_TOPIC,
1,
3,
'ImportacaoController.atualizarProdutoImagemFalha'
)
public async atualizarProdutoImagemFalha(
@Payload() mensagem: AtualizarStatusImportacaoUseCaseProps,
@Ctx() context: KafkaContext
): ResultAsync<any, void> {
return await this.atualizarProdutoImagemUseCase.execute(mensagem)
}
Nesse exemplo, o método atualizarProdutoImagemFalha
será reprocessado uma vez, com até 3 tentativas por ciclo, e o processo será chamado ImportacaoController.atualizarProdutoImagemFalha
.
OBS: É necessário que seja fornecido o KafkaContext nos parâmetros
Etapa 2: Criar o Reprocessamento
Inserir o interceptor @RetryableEventPatternDlqInterceptor
no controller que deseja reprocessar, informando os parâmetros necessários.
@UseInterceptors(RetryableEventPatternDlqInterceptor)
O interceptor RetryableEventPatternDlqInterceptor
não recebe argumentos. Mas, para que o reprocessamento funcione, é necessário que o método reprocessar
seja chamado periodicamente, no tempo que achar necessário e que a rota do endpoint seja chamada passando os seguintes parâmetros no body:
{
"topico": "nome-do-topico",
"processo": "nome-do-processo",
"minutosEntreCiclos": 1
}
OBS: O valor padrão para o parâmetro minutosEntreCiclos
é 1 minuto.
Exemplo de uso
@Post('/teste')
@UseInterceptors(RetryableEventPatternDlqInterceptor)
public async atualizarProdutoImagemSucesso(@Body() props: PROPS_TYPE): Promise<HttpResponseError | HttpResponseOk> {
return super.buildResponse({
result: R.ok(),
})
}
Neste exemplo, para reprocessar o processo dado como exemplo anteriormente, a rota /teste
será chamada da seguinte forma:
curl --location 'http://IP:PORTA/PATH/MODULO/teste' \
--header 'Content-Type: application/json' \
--data '{
"topico": "nome-do-topico",
"processo": "ImportacaoController.atualizarProdutoImagemFalha",
"minutosEntreCiclos": 1
}'
2. Utilizando a classe ControleProcessamento
e a classe ControleReprocessamento
Para utilizar o controle de processamento, você precisa seguir 2 principais etapas, são elas:
Etapa 1: Criar o Processamento
Há duas maneiras de criar um processamento, a primeira é utilizando o método processar
e a segunda é utilizando o método processarComCallback
. A diferença entre eles é que o método processar
irá realizar a função passada como parâmetro novamente, de forma autómatica, o número de vezes padronizado. Enquanto o método processarComCallback
permite que você defina um callback personalizado, que será executado quando a função principal falhar. Segue abaixo os exemplos de uso de ambos os métodos:
Método processar
Instanciar a classe ControleProcessamento
informando os parâmetros e chamar o método processar
.
const controle = new ControleProcessamento(param1, param2)
await controle.processar(param3, param4)
O classe ControleProcessamento
recebe dois argumentos:
Param1
-> processo: O nome do processo à ser executadoParam2
-> parametros: Objeto que contém os parâmetros para o processamento.
O método processar
recebe dois argumentos:
Param3
-> props: Os dados que serão utilizados para chamar acallback
Param4
-> callback: A função que será executada
Os parâmetros para o processamento são:
- quantidadeCiclos: O número máximo de ciclos de reprocessamento.
- quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo.
- cicloAtual: O ciclo atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um ciclo específico.
- numeroTentativa: O número da tentativa atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar uma tentativa específica.
- id: O id do processamento. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um processamento específico.
OBS: Ao informar os parâmetros opcionais, tenha em mente que o processo poderá não seguir a ordem padrão de reprocessamento.
Exemplo:
const parametros = {
quantidadeCiclos: 3,
quantidadeTentativasPorCiclo: 3,
}
Exemplo de uso
Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:
const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
quantidadeCiclos: 3,
quantidadeTentativasPorCiclo: 3,
})
await controle.processar(payload.data, async (processamento: Processamento) => {
const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
skuId: processamento.props['idSkuVtex'],
quantity: processamento.props['totalAvailableEcm'],
warehouseId: processamento.props['idWarehouse'],
branchId: processamento.props['idBranch'],
})
if (retornoVtex.isFailure()) {
return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
}
return R.ok()
})
OBS: Neste exemplo, a tarefa é atualizar o inventário em um serviço externo. Se a atualização falhar, a tarefa será reprocessada até 3 vezes por ciclo, com até 3 ciclos, e um intervalo de 60 minutos entre os ciclos.
Método processarComCallback
A diferença entre o método processar
e o método processarComCallback
é que o método processarComCallback
permite que você defina um callback personalizado, que será executado quando a função principal falhar. Portanto, a utilização é a mesma, com a diferença de que o método processarComCallback
recebe um callback como parâmetro.
Exemplo de uso
Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:
const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
quantidadeCiclos: 3,
quantidadeTentativasPorCiclo: 3,
})
await controle.processar(
payload.data,
async (processamento: Processamento) => {
const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
skuId: processamento.props['idSkuVtex'],
quantity: processamento.props['totalAvailableEcm'],
warehouseId: processamento.props['idWarehouse'],
branchId: processamento.props['idBranch'],
})
if (retornoVtex.isFailure()) {
return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
}
return R.ok()
},
async (processamento: Processamento) => {
// Neste callback, você pode fazer o que quiser, como enviar uma mensagem para um tópico do Kafka
await this.kafkaProducerService.post({
conteudo: JSON.stringify({
correlationId: processamento.id,
cycles: processamento.cicloAtual,
retries: processamento.numeroTentativa,
data: {
...processamento.props,
},
}),
chave: processamento.id,
topico: KafkaTopics.REPROCESSED_STOCK,
headers: headers,
})
return R.ok()
}
)
Etapa 2: Criar o Reprocessamento
Quando um Processamento atinge o máximo de tentativas, ele é marcado como PENDENTE
e salvo na tabela reprocessamento
. Para reprocessar um Processamento marcado como PENDENTE
, você deve criar um novo método no controller que sejá chamado periodicamente, no tempo que achar necessário.
OBS: É interessante que não sejam executados mais 1 reprocessamento ao mesmo tempo, pois pode acabar gerando concorência. Para isso, é possível utilizar as configurações do AbstractUseCase
, definindo o processo com singleThread
Esse método deve chamar algum processo que instâncie a classe ControleReprocessamento
e chamar o método reprocessar
,
const controle = new ControleReprocessamento(param1, param2)
await controle.reprocessar(param3)
A classe ControleReprocessamento
recebe dois argumentos:
Param1
-> processo: O nome do processo à ser executadoParam2
-> minutosEntreCiclos: O número de minutos entre um ciclo e outro utilizado como filtro na query feita na tabela Reprocessamento, tendo como base o valor da colunadt_criacao
. O valor padrão é 1 minuto.
O método reprocessar
recebe uma função de callback que será executada para cada Processamento marcado como PENDENTE
.
OBS 1: O método reprocessar
irá reprocessar todos os processamentos marcados como PENDENTE
que estão dentro do intervalo de tempo definido pelo parâmetro minutosEntreCiclos
.
Exemplo de uso
Aqui está um exemplo de como usar o reprocessamento para o mesmo processo de atualização de estoque em um serviço externo:
@Post('/reprocessar-teste')
async teste(): Promise<HttpResponseOk | HttpResponseError> {
const controle = new ControleReprocessamento(
ParallelStockUpdaterServiceImpl.name,
1
)
await controle.reprocessar(
async (processamento: Processamento) => {
const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
skuId: processamento.props['idSkuVtex'],
quantity: processamento.props['totalAvailableEcm'],
warehouseId: processamento.props['idWarehouse'],
branchId: processamento.props['idBranch'],
})
if (retornoVtex.isFailure()) {
return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
}
return R.ok()
}
)
return super.buildResponse({
result: R.ok(`Reprocessamento iniciado!`),
successStatusCode: 202,
})
}
Exceções
Existem três exceções que podem ser lançadas dentro da função de callback e que são tratadas pelo processo de reprocessamento:
ErroProcessamento
: Esta exceção deve ser lançada quando ocorrer um erro que pode ser resolvido automaticamente. O processo de reprocessamento irá tentar novamente. Se esta exceção for lançada, o processo será marcado comoPENDENTE
e será reprocessado conforme o parâmetrominutosEntreCiclos
.ProcessamentoIrrecuperavelException
: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o processo será marcado comoERRO_PERMANENTE
e não será reprocessado.CicloInvalidoException
: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente neste ciclo. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o ciclo atual será ignorado E O processo será marcado comoPENDENTE
e reprocessado conforme o parâmetrominutosEntreCiclos
.