Hi Peter!
From: Ihnat Peter | TSS Group a(dot)s(dot) <Ihnat(at)TSSGROUP(dot)sk>>
>> I got 2 problems:
>>
>> - Worker is receiving notifications from every channel not only the registered channel (in my case "foo")
>>
>> - Notifications are not logged in the server log - I cannot store the payloads for further work
>> Any help is welcomed.
>>
>> Here is the code:
>>
>> PG_MODULE_MAGIC;
>>
>> void _PG_init(void);
>> void _PG_fini(void);
>>
>> static volatile sig_atomic_t got_sigterm = false;
>> static volatile sig_atomic_t got_sigusr1 = false;
>> static char *notify_database = NULL;
>> static emit_log_hook_type prev_log_hook = NULL;
>>
>> static void
>> bgw_sigterm(SIGNAL_ARGS)
>> {
>> int save_errno = errno;
>> got_sigterm = true;
>> if (MyProc)
>> SetLatch(&MyProc->procLatch);
>> errno = save_errno;
>> }
>>
>> static void
>> bgw_sigusr1(SIGNAL_ARGS)
>> {
>> int save_errno = errno;
>> got_sigusr1 = true;
>> if (MyProc)
>> SetLatch(&MyProc->procLatch);
>> errno = save_errno;
>> }
>>
>> static void
>> notify_main(Datum main_arg)
>> {
>> pqsignal(SIGTERM, bgw_sigterm);
>> pqsignal(SIGUSR1, bgw_sigusr1);
>>
>> BackgroundWorkerUnblockSignals();
>> BackgroundWorkerInitializeConnection(notify_database, NULL);
>> EnableNotifyInterrupt();
>>
>> pgstat_report_activity(STATE_RUNNING, "background_worker");
>> StartTransactionCommand();
>> Async_Listen("foo");
>> CommitTransactionCommand();
>> pgstat_report_activity(STATE_IDLE, NULL);
>>
>> while (!got_sigterm)
>> {
>> int rc;
>>
>> rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>> ResetLatch(&MyProc->procLatch);
>>
>> if (rc & WL_POSTMASTER_DEATH)
>> proc_exit(1);
>>
>> if (got_sigusr1)
>> {
>> got_sigusr1 = false;
>> elog(INFO, " background_worker: notification received");
>> // DO SOME WORK WITH STORED NOTIFICATIONS
>> }
>>
>> }
>>
>> elog(LOG, "background_worker: finished");
>> proc_exit(0);
>> }
>>
>> static void
>> store_notification(ErrorData *edata)
>> {
>> // HERE STORE THE NOTIFICATION FROM SERVER LOG
>>
>> if (prev_log_hook)
>> (*prev_log_hook) (edata);
>> }
>>
>> void
>> _PG_init(void)
>> {
>> BackgroundWorker worker;
>> DefineCustomStringVariable("postgres", NULL, NULL, ¬ify_database,
>> "postgres",
>> PGC_POSTMASTER, 0, NULL, NULL, NULL);
>>
>> MemSet(&worker, 0, sizeof(BackgroundWorker));
>> snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>> worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
>> worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>> worker.bgw_main = notify_main;
>> worker.bgw_restart_time = 10;
>> worker.bgw_main_arg = (Datum) 0;
>> worker.bgw_notify_pid = 0;
>> RegisterBackgroundWorker(&worker);
>>
>> prev_log_hook = emit_log_hook;
>> emit_log_hook = store_notification;
>> }
>>
>> void
>> _PG_fini(void)
>> {
>> emit_log_hook = prev_log_hook;
>> }
The solution to this problem would be also interesting for me. We have application which use sending data to background worker and it's look like the asynchronous notification can be ideal solution. But we do not found solution how to work with this notifications. Worker calculating data and update some tables. Amount of data is too big and therefore we can’t calculate and update tables only in fixed period.
If the notification can’t be used for this, then it is possible to use other method how to send data to worker or wake up worker?
If the notification can’t be used for this, then it is possible to use other method how to send data to worker or wake up worker?
Best regards,
Peter K.
Peter K.
From: Ihnat Peter | TSS Group a(dot)s(dot) <Ihnat(at)TSSGROUP(dot)sk>>
>> I got 2 problems:
>>
>> - Worker is receiving notifications from every channel not only the registered channel (in my case "foo")
>>
>> - Notifications are not logged in the server log - I cannot store the payloads for further work
>> Any help is welcomed.
>>
>> Here is the code:
>>
>> PG_MODULE_MAGIC;
>>
>> void _PG_init(void);
>> void _PG_fini(void);
>>
>> static volatile sig_atomic_t got_sigterm = false;
>> static volatile sig_atomic_t got_sigusr1 = false;
>> static char *notify_database = NULL;
>> static emit_log_hook_type prev_log_hook = NULL;
>>
>> static void
>> bgw_sigterm(SIGNAL_ARGS)
>> {
>> int save_errno = errno;
>> got_sigterm = true;
>> if (MyProc)
>> SetLatch(&MyProc->procLatch);
>> errno = save_errno;
>> }
>>
>> static void
>> bgw_sigusr1(SIGNAL_ARGS)
>> {
>> int save_errno = errno;
>> got_sigusr1 = true;
>> if (MyProc)
>> SetLatch(&MyProc->procLatch);
>> errno = save_errno;
>> }
>>
>> static void
>> notify_main(Datum main_arg)
>> {
>> pqsignal(SIGTERM, bgw_sigterm);
>> pqsignal(SIGUSR1, bgw_sigusr1);
>>
>> BackgroundWorkerUnblockSignals();
>> BackgroundWorkerInitializeConnection(notify_database, NULL);
>> EnableNotifyInterrupt();
>>
>> pgstat_report_activity(STATE_RUNNING, "background_worker");
>> StartTransactionCommand();
>> Async_Listen("foo");
>> CommitTransactionCommand();
>> pgstat_report_activity(STATE_IDLE, NULL);
>>
>> while (!got_sigterm)
>> {
>> int rc;
>>
>> rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>> ResetLatch(&MyProc->procLatch);
>>
>> if (rc & WL_POSTMASTER_DEATH)
>> proc_exit(1);
>>
>> if (got_sigusr1)
>> {
>> got_sigusr1 = false;
>> elog(INFO, " background_worker: notification received");
>> // DO SOME WORK WITH STORED NOTIFICATIONS
>> }
>>
>> }
>>
>> elog(LOG, "background_worker: finished");
>> proc_exit(0);
>> }
>>
>> static void
>> store_notification(ErrorData *edata)
>> {
>> // HERE STORE THE NOTIFICATION FROM SERVER LOG
>>
>> if (prev_log_hook)
>> (*prev_log_hook) (edata);
>> }
>>
>> void
>> _PG_init(void)
>> {
>> BackgroundWorker worker;
>> DefineCustomStringVariable("postgres", NULL, NULL, ¬ify_database,
>> "postgres",
>> PGC_POSTMASTER, 0, NULL, NULL, NULL);
>>
>> MemSet(&worker, 0, sizeof(BackgroundWorker));
>> snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>> worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
>> worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>> worker.bgw_main = notify_main;
>> worker.bgw_restart_time = 10;
>> worker.bgw_main_arg = (Datum) 0;
>> worker.bgw_notify_pid = 0;
>> RegisterBackgroundWorker(&worker);
>>
>> prev_log_hook = emit_log_hook;
>> emit_log_hook = store_notification;
>> }
>>
>> void
>> _PG_fini(void)
>> {
>> emit_log_hook = prev_log_hook;
>> }