npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@qq-framework/task-manager

v1.0.0

Published

Projeto Gerenciador de Tarefas das Lojas Quero-Quero

Downloads

4

Readme

Como reprocessar tarefas de forma automática

O processo de reprocessamento é uma funcionalidade que permite a execução de tarefas de forma controlada, com tentativas repetidas em caso de falha. Ele é útil para tarefas que podem falhar temporariamente devido a problemas de rede, serviços externos indisponíveis, entre outros.

Pré-requisitos

Para utilizar o reprocessamento de tarefas, será necessário importar a model Reprocessamento nas entities do seu arquivo ormconfig.ts.

const entities = ['dist/src/modules/**/infra/models/*.model{.ts,.js}', ReprocessamentoModel]

export default {
    type: 'postgres',
    url: process.env.DATABASE_URL,
    entities,
    synchronize: false,
    logging: false,
    migrations: ['dist/src/shared/infra/typeorm/migrations/*.js'],
    cli: {
        migrationsDir: 'src/shared/infra/typeorm/migrations',
    },
}

Como usar

Atualmente há 2 formas de reprocessar tarefas de forma automática, são elas:

  1. Utilizando o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor, ambos utilizam o Kafka para reprocessar as mensagens.

  2. Utilizando a classe ControleProcessamento e a classe ControleReprocessamento, que são classes que permitem reprocessar tarefas de forma automática, sem a necessidade de utilizar o Kafka (mas que também podem implementar envio de eventos).

1. Utilizando o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor

Para utilizar o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor, você precisa seguir 2 principais etapas, são elas:

Etapa 1: Criar o Processamento

Inserir o decorator @RetryableEventPattern no método que deseja reprocessar, informando os parâmetros necessários.

    @RetryableEventPattern(
        param1,
        param2,
        param3,
        param4
    )

O decorator RetryableEventPattern recebe quatro argumentos:

  1. Param1 -> topico: O nome do tópico do Kafka que será enviado a mensagem em caso de falha.

  2. Param2 -> quantidadeCiclos: O número máximo de ciclos de reprocessamento. O valor padrão é 1.

  3. Param3 -> quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo. O valor padrão é 3.

  4. Param4 -> processo: O nome do processo à ser executado.

Exemplo de uso
    @RetryableEventPattern(
        KafkaTopics.IMAGEM_PRODUTO_IMPORTADA_FALHA_TOPIC,
        1,
        3,
        'ImportacaoController.atualizarProdutoImagemFalha'
    )
    public async atualizarProdutoImagemFalha(
        @Payload() mensagem: AtualizarStatusImportacaoUseCaseProps,
        @Ctx() context: KafkaContext
    ): ResultAsync<any, void> {
        return await this.atualizarProdutoImagemUseCase.execute(mensagem)
    }

Nesse exemplo, o método atualizarProdutoImagemFalha será reprocessado uma vez, com até 3 tentativas por ciclo, e o processo será chamado ImportacaoController.atualizarProdutoImagemFalha.

OBS: É necessário que seja fornecido o KafkaContext nos parâmetros

Etapa 2: Criar o Reprocessamento

Inserir o interceptor @RetryableEventPatternDlqInterceptor no controller que deseja reprocessar, informando os parâmetros necessários.

    @UseInterceptors(RetryableEventPatternDlqInterceptor)

O interceptor RetryableEventPatternDlqInterceptor não recebe argumentos. Mas, para que o reprocessamento funcione, é necessário que o método reprocessar seja chamado periodicamente, no tempo que achar necessário e que a rota do endpoint seja chamada passando os seguintes parâmetros no body:

{
    "topico": "nome-do-topico",
    "processo": "nome-do-processo",
    "minutosEntreCiclos": 1
}

OBS: O valor padrão para o parâmetro minutosEntreCiclos é 1 minuto.

Exemplo de uso
    @Post('/teste')
    @UseInterceptors(RetryableEventPatternDlqInterceptor)
    public async atualizarProdutoImagemSucesso(@Body() props: PROPS_TYPE): Promise<HttpResponseError | HttpResponseOk> {
        return super.buildResponse({
            result: R.ok(),
        })
    }

Neste exemplo, para reprocessar o processo dado como exemplo anteriormente, a rota /teste será chamada da seguinte forma:

curl --location 'http://IP:PORTA/PATH/MODULO/teste' \
--header 'Content-Type: application/json' \
--data '{
    "topico": "nome-do-topico",
    "processo": "ImportacaoController.atualizarProdutoImagemFalha",
    "minutosEntreCiclos": 1
}'

2. Utilizando a classe ControleProcessamento e a classe ControleReprocessamento

Para utilizar o controle de processamento, você precisa seguir 2 principais etapas, são elas:

Etapa 1: Criar o Processamento

Há duas maneiras de criar um processamento, a primeira é utilizando o método processar e a segunda é utilizando o método processarComCallback. A diferença entre eles é que o método processar irá realizar a função passada como parâmetro novamente, de forma autómatica, o número de vezes padronizado. Enquanto o método processarComCallback permite que você defina um callback personalizado, que será executado quando a função principal falhar. Segue abaixo os exemplos de uso de ambos os métodos:

Método processar

Instanciar a classe ControleProcessamento informando os parâmetros e chamar o método processar.

const controle = new ControleProcessamento(param1, param2)

await controle.processar(param3, param4)

O classe ControleProcessamento recebe dois argumentos:

  1. Param1 -> processo: O nome do processo à ser executado

  2. Param2 -> parametros: Objeto que contém os parâmetros para o processamento.

O método processar recebe dois argumentos:

  1. Param3 -> props: Os dados que serão utilizados para chamar a callback

  2. Param4 -> callback: A função que será executada

Os parâmetros para o processamento são:

  • quantidadeCiclos: O número máximo de ciclos de reprocessamento.
  • quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo.
  • cicloAtual: O ciclo atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um ciclo específico.
  • numeroTentativa: O número da tentativa atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar uma tentativa específica.
  • id: O id do processamento. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um processamento específico.

OBS: Ao informar os parâmetros opcionais, tenha em mente que o processo poderá não seguir a ordem padrão de reprocessamento.

Exemplo:

const parametros = {
    quantidadeCiclos: 3,
    quantidadeTentativasPorCiclo: 3,
}
Exemplo de uso

Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:

const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
    quantidadeCiclos: 3,
    quantidadeTentativasPorCiclo: 3,
})

await controle.processar(payload.data, async (processamento: Processamento) => {
    const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
        skuId: processamento.props['idSkuVtex'],
        quantity: processamento.props['totalAvailableEcm'],
        warehouseId: processamento.props['idWarehouse'],
        branchId: processamento.props['idBranch'],
    })

    if (retornoVtex.isFailure()) {
        return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
    }

    return R.ok()
})

OBS: Neste exemplo, a tarefa é atualizar o inventário em um serviço externo. Se a atualização falhar, a tarefa será reprocessada até 3 vezes por ciclo, com até 3 ciclos, e um intervalo de 60 minutos entre os ciclos.

Método processarComCallback

A diferença entre o método processar e o método processarComCallback é que o método processarComCallback permite que você defina um callback personalizado, que será executado quando a função principal falhar. Portanto, a utilização é a mesma, com a diferença de que o método processarComCallback recebe um callback como parâmetro.

Exemplo de uso

Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:

const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
    quantidadeCiclos: 3,
    quantidadeTentativasPorCiclo: 3,
})

await controle.processar(
    payload.data,
    async (processamento: Processamento) => {
        const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
            skuId: processamento.props['idSkuVtex'],
            quantity: processamento.props['totalAvailableEcm'],
            warehouseId: processamento.props['idWarehouse'],
            branchId: processamento.props['idBranch'],
        })

        if (retornoVtex.isFailure()) {
            return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
        }

        return R.ok()
    },
    async (processamento: Processamento) => {
        // Neste callback, você pode fazer o que quiser, como enviar uma mensagem para um tópico do Kafka
        await this.kafkaProducerService.post({
            conteudo: JSON.stringify({
                correlationId: processamento.id,
                cycles: processamento.cicloAtual,
                retries: processamento.numeroTentativa,
                data: {
                    ...processamento.props,
                },
            }),
            chave: processamento.id,
            topico: KafkaTopics.REPROCESSED_STOCK,
            headers: headers,
        })

        return R.ok()
    }
)

Etapa 2: Criar o Reprocessamento

Quando um Processamento atinge o máximo de tentativas, ele é marcado como PENDENTE e salvo na tabela reprocessamento. Para reprocessar um Processamento marcado como PENDENTE, você deve criar um novo método no controller que sejá chamado periodicamente, no tempo que achar necessário.

OBS: É interessante que não sejam executados mais 1 reprocessamento ao mesmo tempo, pois pode acabar gerando concorência. Para isso, é possível utilizar as configurações do AbstractUseCase, definindo o processo com singleThread

Esse método deve chamar algum processo que instâncie a classe ControleReprocessamento e chamar o método reprocessar,

const controle = new ControleReprocessamento(param1, param2)
await controle.reprocessar(param3)

A classe ControleReprocessamento recebe dois argumentos:

  1. Param1 -> processo: O nome do processo à ser executado

  2. Param2 -> minutosEntreCiclos: O número de minutos entre um ciclo e outro utilizado como filtro na query feita na tabela Reprocessamento, tendo como base o valor da coluna dt_criacao. O valor padrão é 1 minuto.

O método reprocessar recebe uma função de callback que será executada para cada Processamento marcado como PENDENTE.

OBS 1: O método reprocessar irá reprocessar todos os processamentos marcados como PENDENTE que estão dentro do intervalo de tempo definido pelo parâmetro minutosEntreCiclos.

Exemplo de uso

Aqui está um exemplo de como usar o reprocessamento para o mesmo processo de atualização de estoque em um serviço externo:

    @Post('/reprocessar-teste')

    async teste(): Promise<HttpResponseOk | HttpResponseError> {
        const controle = new ControleReprocessamento(
            ParallelStockUpdaterServiceImpl.name,
            1
        )

        await controle.reprocessar(
            async (processamento: Processamento) => {
                const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
                    skuId: processamento.props['idSkuVtex'],
                    quantity: processamento.props['totalAvailableEcm'],
                    warehouseId: processamento.props['idWarehouse'],
                    branchId: processamento.props['idBranch'],
                })

                if (retornoVtex.isFailure()) {
                    return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
                }

                return R.ok()
            }
        )

        return super.buildResponse({
            result: R.ok(`Reprocessamento iniciado!`),
            successStatusCode: 202,
        })
    }

Exceções

Existem três exceções que podem ser lançadas dentro da função de callback e que são tratadas pelo processo de reprocessamento:

  1. ErroProcessamento: Esta exceção deve ser lançada quando ocorrer um erro que pode ser resolvido automaticamente. O processo de reprocessamento irá tentar novamente. Se esta exceção for lançada, o processo será marcado como PENDENTE e será reprocessado conforme o parâmetro minutosEntreCiclos.
  2. ProcessamentoIrrecuperavelException: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o processo será marcado como ERRO_PERMANENTE e não será reprocessado.
  3. CicloInvalidoException: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente neste ciclo. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o ciclo atual será ignorado E O processo será marcado como PENDENTE e reprocessado conforme o parâmetro minutosEntreCiclos.