't.type = :sendingType') ->setParameter('sendingType', 'sending') ->andWhere('tsub.subscriber = :subscriber') ->setParameter('subscriber', $subscriber); if ($dateTo) { $qb->andWhere('t.updatedAt < :dateTo') ->setParameter('dateTo', $dateTo); } if ($dateFrom) { $qb->andWhere('t.updatedAt > :dateFrom') ->setParameter('dateFrom', $dateFrom); } return $qb->getQuery()->getResult(); } public function getCampaignAnalyticsQuery() { $sevenDaysAgo = Carbon::now()->subDays(7); $thirtyDaysAgo = Carbon::now()->subDays(30); $threeMonthsAgo = Carbon::now()->subMonths(3); return $this->doctrineRepository->createQueryBuilder('q') ->select(' n.type as newsletterType, q.meta as sendingQueueMeta, CASE WHEN COUNT(s.id) > 0 THEN true ELSE false END as sentToSegment, CASE WHEN t.processedAt >= :sevenDaysAgo THEN true ELSE false END as sentLast7Days, CASE WHEN t.processedAt >= :thirtyDaysAgo THEN true ELSE false END as sentLast30Days, CASE WHEN t.processedAt >= :threeMonthsAgo THEN true ELSE false END as sentLast3Months') ->join('q.task', 't') ->leftJoin('q.newsletter', 'n') ->leftJoin('n.newsletterSegments', 'ns') ->leftJoin('ns.segment', 's', 'WITH', 's.type = :dynamicType') ->andWhere('t.status = :taskStatus') ->andWhere('t.processedAt >= :since') ->setParameter('sevenDaysAgo', $sevenDaysAgo) ->setParameter('thirtyDaysAgo', $thirtyDaysAgo) ->setParameter('threeMonthsAgo', $threeMonthsAgo) ->setParameter('dynamicType', SegmentEntity::TYPE_DYNAMIC) ->setParameter('taskStatus', ScheduledTaskEntity::STATUS_COMPLETED) ->setParameter('since', $threeMonthsAgo) ->groupBy('q.id') ->getQuery(); } public function pause(SendingQueueEntity $queue): void { if ($queue->getCountProcessed() !== $queue->getCountTotal()) { $task = $queue->getTask(); if ($task instanceof ScheduledTaskEntity) { $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->flush(); } } } public function resume(SendingQueueEntity $queue): void { $task = $queue->getTask(); if (!$task instanceof ScheduledTaskEntity) return; if ($queue->getCountProcessed() === $queue->getCountTotal()) { $processedAt = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $task->setProcessedAt($processedAt); $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); // Update also status of newsletter if necessary $newsletter = $queue->getNewsletter(); if ($newsletter instanceof NewsletterEntity && $newsletter->canBeSetSent()) { $newsletter->setStatus(NewsletterEntity::STATUS_SENT); } $this->flush(); } else { $newsletter = $queue->getNewsletter(); if (!$newsletter instanceof NewsletterEntity) return; if ($newsletter->getStatus() === NewsletterEntity::STATUS_CORRUPT) { // force a re-render $queue->setNewsletterRenderedBody(null); $this->persist($queue); } $newsletter->setStatus(NewsletterEntity::STATUS_SENDING); $task->setStatus(null); $this->flush(); } } public function deleteByTask(ScheduledTaskEntity $scheduledTask): void { $this->entityManager->createQueryBuilder() ->delete(SendingQueueEntity::class, 'sq') ->where('sq.task = :task') ->setParameter('task', $scheduledTask) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (SendingQueueEntity $entity) use ($scheduledTask) { return $entity->getTask() === $scheduledTask; }); } public function saveCampaignId(SendingQueueEntity $queue, string $campaignId): void { $meta = $queue->getMeta(); if (!is_array($meta)) { $meta = []; } $meta['campaignId'] = $campaignId; $queue->setMeta($meta); $this->flush(); } public function saveFilterSegmentMeta(SendingQueueEntity $queue, SegmentEntity $filterSegmentEntity): void { $meta = $queue->getMeta() ?? []; $meta['filterSegment'] = [ 'id' => $filterSegmentEntity->getId(), 'name' => $filterSegmentEntity->getName(), 'updatedAt' => $filterSegmentEntity->getUpdatedAt(), 'filters' => array_map(function(DynamicSegmentFilterEntity $filterEntity) { $filter = $this->filterFactory->getFilterForFilterEntity($filterEntity); $data = $filterEntity->getFilterData(); $filterData = [ 'filterType' => $data->getFilterType(), 'action' => $data->getAction(), 'data' => $filterEntity->getFilterData()->getData(), 'lookupData' => [], ]; try { $filterData['lookupData'] = $filter->getLookupData($data); } catch (\Throwable $e) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_SEGMENTS)->error("Failed to save lookup data for filter {$filterEntity->getId()}: {$e->getMessage()}"); } return $filterData; }, $filterSegmentEntity->getDynamicFilters()->toArray()), ]; $queue->setMeta($meta); $this->flush(); } public function updateCounts(SendingQueueEntity $queue, ?int $count = null): void { if ($count) { // increment/decrement counts based on known subscriber count, don't exceed the bounds $queue->setCountProcessed(min($queue->getCountProcessed() + $count, $queue->getCountTotal())); $queue->setCountToProcess(max($queue->getCountToProcess() - $count, 0)); } else { // query DB to update counts, slower but more accurate, to be used if count isn't known $task = $queue->getTask(); $processed = $task ? $this->scheduledTaskSubscribersRepository->countProcessed($task) : 0; $unprocessed = $task ? $this->scheduledTaskSubscribersRepository->countUnprocessed($task) : 0; $queue->setCountProcessed($processed); $queue->setCountToProcess($unprocessed); $queue->setCountTotal($processed + $unprocessed); } $this->entityManager->flush(); } /** @param int[] $ids */ public function deleteByNewsletterIds(array $ids): void { $this->entityManager->createQueryBuilder() ->delete(SendingQueueEntity::class, 'q') ->where('q.newsletter IN (:ids)') ->setParameter('ids', $ids) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (SendingQueueEntity $entity) use ($ids) { $newsletter = $entity->getNewsletter(); return $newsletter && in_array($newsletter->getId(), $ids, true); }); } }