Browse Source

feat(job-queue-plugin): Improve pub/sub message handling (#2561)

Catching invalid messages
Better logging about message handling
Fred Cox 2 năm trước cách đây
mục cha
commit
3645819244

+ 13 - 3
packages/job-queue-plugin/src/pub-sub/pub-sub-job-queue-strategy.ts

@@ -71,8 +71,9 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
         }
         }
 
 
         const subscription = this.subscription(queueName);
         const subscription = this.subscription(queueName);
-        const listener = (message: Message) => {
-            Logger.debug(`Received message: ${queueName}: ${message.id}`, loggerCtx);
+
+        const processMessage = async (message: Message) => {
+            Logger.verbose(`Received message: ${queueName}: ${message.id}`, loggerCtx);
 
 
             const job = new Job<Data>({
             const job = new Job<Data>({
                 id: message.id,
                 id: message.id,
@@ -84,12 +85,21 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
                 createdAt: message.publishTime,
                 createdAt: message.publishTime,
             });
             });
 
 
-            process(job)
+            await process(job);
+        };
+
+        const listener = (message: Message) => {
+            processMessage(message)
                 .then(() => {
                 .then(() => {
                     message.ack();
                     message.ack();
+                    Logger.verbose(`Finished handling: ${queueName}: ${message.id}`, loggerCtx);
                 })
                 })
                 .catch(err => {
                 .catch(err => {
                     message.nack();
                     message.nack();
+                    Logger.error(
+                        `Error handling: ${queueName}: ${message.id}: ${String(err.message)}`,
+                        loggerCtx,
+                    );
                 });
                 });
         };
         };
         this.listeners.set(queueName, process, listener);
         this.listeners.set(queueName, process, listener);